thinkphp5 消息队列thinkphp-queue扩展
1.簡介
thinkphp-queue是thinkphp的一個第三方擴展, 內置了?Redis,Database,Topthink?,Sync這四種驅動,推薦使用redis
2. 下載 和安裝
composer require topthink/think-queue
配置目錄在:?application/extra/queue.php
return ['connector' => 'Redis', // Redis 驅動'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null'default' => 'default', // 默認的隊列名稱'host' => '127.0.0.1', // redis 主機ip'port' => 6379, // redis 端口'password' => '', // redis 密碼'select' => 0, // 使用哪一個 db,默認為 db0'timeout' => 0, // redis連接的超時時間'persistent' => false, // 是否是長連接// 'connector' => 'Database', // 數據庫驅動// 'expire' => 60, // 任務的過期時間,默認為60秒; 若要禁用,則設置為 null// 'default' => 'default', // 默認的隊列名稱// 'table' => 'jobs', // 存儲消息的表名,不帶前綴// 'dsn' => [],// 'connector' => 'Topthink', // ThinkPHP內部的隊列通知服務平臺 ,本文不作介紹// 'token' => '',// 'project_id' => '',// 'protocol' => 'https',// 'host' => 'qns.topthink.com',// 'port' => 443,// 'api_version' => 1,// 'max_retries' => 3,// 'default' => 'default',// 'connector' => 'Sync', // Sync 驅動,該驅動的實際作用是取消消息隊列,還原為同步執行 ];3.入隊,創建隊列的代碼
/** * 文件路徑: \application\index\controller\JobTest.php * 該控制器的業務代碼中借助了thinkphp-queue 庫,將一個消息推送到消息隊列 */ namespace application\index\controller;use think\Exception;use think\Queue;class JobTest {/*** 一個使用了隊列的 action*/public function actionWithHelloJob(){// 1.當前任務將由哪個類來負責處理。 // 當輪到該任務時,系統將生成一個該類的實例,并調用其 fire 方法$jobHandlerClassName = 'application\index\job\Hello'; // 2.當前任務歸屬的隊列名稱,如果為新隊列,會自動創建$jobQueueName = "helloJobQueue"; // 3.當前任務所需的業務數據 . 不能為 resource 類型,其他類型最終將轉化為json形式的字符串// ( jobData 為對象時,需要在先在此處手動序列化,否則只存儲其public屬性的鍵值對)$jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;// 4.將該任務推送到消息隊列,等待對應的消費者去執行$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字符串|falseif( $isPushed !== false ){ echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";}else{echo 'Oops, something went wrong.';}}}如果是多個任務:如果一個任務類里有多個小任務的話,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1、app\lib\job\Job2@task2
4.消費隊列的代碼:
<?php namespace app\test\job;use think\queue\Job;class Hello {/*** fire方法是消息隊列默認調用的方法* @param Job $job 當前的任務對象* @param array|mixed $data 發布任務時自定義的數據*/public function fire(Job $job,$data){// 如有必要,可以根據業務需求和數據庫中的最新數據,判斷該任務是否仍有必要執行.$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);if(!isJobStillNeedToBeDone){$job->delete();return;}$isJobDone = $this->doHelloJob($data);if ($isJobDone) {//如果任務執行成功, 記得刪除任務$job->delete();print("<info>Hello Job has been done and deleted"."</info>\n");}else{if ($job->attempts() > 3) {//通過這個方法可以檢查這個任務已經重試了幾次了print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");$job->delete();// 也可以重新發布這個任務//print("<info>Hello Job will be availabe again after 2s."."</info>\n");//$job->release(2); //$delay為延遲時間,表示該任務延遲2秒后再執行 }}}/*** 有些消息在到達消費者時,可能已經不再需要執行了* @param array|mixed $data 發布任務時自定義的數據* @return boolean 任務執行的結果*/private function checkDatabaseToSeeIfJobNeedToBeDone($data){return true;}/*** 根據消息中的數據進行實際的業務處理* @param array|mixed $data 發布任務時自定義的數據* @return boolean 任務執行的結果*/private function doHelloJob($data) {// 根據消息中的數據進行實際的業務處理...var_dump($data); // print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n"); // print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n"); // print("<info>Hello Job is Done!"."</info> \n");return true;} }執行代碼:
php think queue:work --queue helloJobQueue5.總結:
5.1 命名:
-
queue:subscribe 命令?[截至2017-02-15,作者暫未實現該模式,略過]
-
queue:work 命令
work 命令: 該命令將啟動一個 work 進程來處理消息隊列。
php think queue:work --queue helloJobQueue -
queue:listen 命令
listen 命令: 該命令將會創建一個 listen 父進程 ,然后由父進程通過?proc_open(‘php think queue:work’)?的方式來創建一個work 子 進程來處理消息隊列,且限制該work進程的執行時間。
php think queue:listen --queue helloJobQueuequeue:restart 重新開啟
?
?
參考資料:https://blog.csdn.net/will5451/article/details/80434174
https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645
https://github.com/top-think/think-queue
?
轉載于:https://www.cnblogs.com/myvic/p/9373710.html
總結
以上是生活随笔為你收集整理的thinkphp5 消息队列thinkphp-queue扩展的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 支付宝怎么删除绑卡
- 下一篇: 机器人板块有哪些上市公司 已经受到股票投