tp5 mysql实现消息队列_TP5系列 | Queue消息队列
消費(fèi)信息如下ThinkPHP5 Queue消息隊(duì)列
優(yōu)點(diǎn)
1、Queue內(nèi)置了 Redis,Database,Topthink ,Sync這四種驅(qū)動(dòng),本文使用Redis驅(qū)動(dòng)
2、Queue消息隊(duì)列適用于大并發(fā)或者返回結(jié)果 時(shí)間有點(diǎn)長(zhǎng)并需要批量操作的第三方接口,可用于短信發(fā)送、郵件發(fā)送、APP推送
3、Queue消息消息可進(jìn)行發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時(shí)控制等操作
流程圖
創(chuàng)建隊(duì)列
文件路徑:application\common\queue\TestQueue.php
TestQueue.php 參考代碼
namespace app\common\queue;
use think\facade\Log;
use think\queue\Job;
class TestQueue
{
public function fire(Job $job, $data)
{
$isJobDone = $this->testJob($data);
// 如果任務(wù)執(zhí)行成功后 記得刪除任務(wù),不然這個(gè)任務(wù)會(huì)重復(fù)執(zhí)行,直到達(dá)到最大重試次數(shù)后失敗后,執(zhí)行failed方法
if ($isJobDone) {
$job->delete();
} else {
//通過(guò)這個(gè)方法可以檢查這個(gè)任務(wù)已經(jīng)重試了幾次了
$attempts = $job->attempts();
echo $attempts;
if ($attempts == 0 || $attempts == 1) {
// 重新發(fā)布這個(gè)任務(wù)
$job->release(2); //$delay為延遲時(shí)間,延遲2S后繼續(xù)執(zhí)行
} elseif ($attempts == 2) {
$job->release(5); // 延遲5S后繼續(xù)執(zhí)行
}
}
}
/**
* @Desc: 任務(wù)執(zhí)行失敗后自動(dòng)執(zhí)行方法
* @param $data
*/
public function failed($data)
{
// ...任務(wù)達(dá)到最大重試次數(shù)后,失敗了
Log::error('任務(wù)達(dá)到最大重試次數(shù)后,失敗了 '.json_encode($data));
}
/**
* @Desc: 自定義需要加入的隊(duì)列任務(wù)
*/
private function testJob($data)
{
$jsonData = json_encode($data);
echo "1、具體執(zhí)行任務(wù)接受到的參數(shù):{$jsonData} \r\n";
if($data){
echo "2、恭喜你!{$data['email']} 郵件發(fā)送成功了 \r\n";
return true;
}else{
echo "2、很遺憾,{$data['email']} 郵件發(fā)送失敗了 \r\n";
return false;
}
}
}
配置隊(duì)列
1、這里使用Redis驅(qū)動(dòng)來(lái)存儲(chǔ)隊(duì)列消息
2、隊(duì)列配置文件路徑:application\config\queue
配置參考代碼
return [
'connector' => 'Redis',
'expire' => 3600,
'default' => 'REDIS_QUEUE',
'host' => 'dnmp-redis',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false,
];
生產(chǎn)者參考代碼
/**
* @Desc: 生產(chǎn)者生產(chǎn)消息
*/
public function productMsg()
{
// 當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù),不能為 resource 類型,其他類型最終將轉(zhuǎn)化為json形式的字符串
$data = [
'email' => rand(11,99).'@qq.com',
'username' => 'Tinywan'
];
// 當(dāng)前任務(wù)歸屬的隊(duì)列名稱,如果為新隊(duì)列,會(huì)自動(dòng)創(chuàng)建
$queueName = 'testQueue';
// 將該任務(wù)推送到消息隊(duì)列,等待對(duì)應(yīng)的消費(fèi)者去執(zhí)行
$isPushed = Queue::push(TestQueue::class, $data, $queueName);
// database 驅(qū)動(dòng)時(shí),返回值為 1|false; redis驅(qū)動(dòng)時(shí),返回值為 隨機(jī)字符串|false
if ($isPushed !== false) {
echo '['.$data['email'].']'." 隊(duì)列加入成功 \r\n";
} else {
echo "隊(duì)列加入失敗 \r\n";
}
}
為了方便演示,這里使用cli模式。
執(zhí)行生產(chǎn)者:php product_msg.php
# php product_msg.php
[27@qq.com] 隊(duì)列加入成功
# php product_msg.php
[77@qq.com] 隊(duì)列加入成功
1、這時(shí)候消息已經(jīng)被持久化到Redis中去了(通過(guò)列表去存儲(chǔ))
2、推送成功,雖然我們這時(shí)候已經(jīng)寫(xiě)好了消費(fèi)者,但是我們并沒(méi)有開(kāi)始消費(fèi)。但是推送消息依然是成功的。這個(gè)就是中間件的優(yōu)勢(shì)。他連接兩個(gè)系統(tǒng),但是又不會(huì)互相耦合,生產(chǎn)者并不會(huì)因?yàn)橄M(fèi)者的異常而影響到自己。
3、消息推送成功之后,如果沒(méi)有消費(fèi)者,消息會(huì)堆積在隊(duì)列中。不過(guò)別怕,消息堆積很正常,并且一般的中間件堆積能力是非常強(qiáng)的。比如阿里就宣傳自己mq可以堆積上億條數(shù)據(jù)。
查看Redis消息與隊(duì)列
> docker exec -it dnmp-redis redis-cli
127.0.0.1:6379> keys *
127.0.0.1:6379> keys *
1) "queues:testQueue"
127.0.0.1:6379> TYPE queues:testQueue
list
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"27@qq.com\",\"username\":\"Tinywan\"},\"id\":\"MLgNb4LFALhtmp7HZtfXMFPRUT0r94Bi\",\"attempts\":1}"
2) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>
消費(fèi)者
開(kāi)始消費(fèi)消息。執(zhí)行cli 命令 php think queue:work--queue隊(duì)列名稱
# php think queue:work --queue testQueue
1、具體執(zhí)行任務(wù)接受到的參數(shù): {"email":"27@qq.com","username":"Tinywan"}
2、恭喜你!27@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
這里每消費(fèi)掉一條消息,Redis數(shù)據(jù)庫(kù)中將會(huì)減少一條消息
查看Redis隊(duì)列消息
127.0.0.1:6379> LRANGE queues:testQueue 0 -1
1) "{\"job\":\"app\\\\common\\\\queue\\\\TestQueue\",\"data\":{\"email\":\"77@qq.com\",\"username\":\"Tinywan\"},\"id\":\"JM16vvjMylfJDnOpldJaHda8xMwuYYzP\",\"attempts\":1}"
127.0.0.1:6379>
命令行掛起守護(hù)進(jìn)程執(zhí)行
/usr/bin/php /var/www/tp5/think queue:work --daemon --queue testQueue --memory 256
--daemon 是否循環(huán)執(zhí)行,如果不加該參數(shù)則該命令處理完下一個(gè)消息就退出 --queue 要處理的隊(duì)列的名稱 --delay 0 如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0。 --memory 該進(jìn)程允許使用的內(nèi)存上限,以M為單位。
流程圖
消費(fèi)信息如下
php think queue:work --daemon --queue testQueue
1、具體執(zhí)行任務(wù)接受到的參數(shù): {"email":"77@qq.com","username":"Tinywan"}
2、恭喜你!77@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
1、具體執(zhí)行任務(wù)接受到的參數(shù): {"email":"80@qq.com","username":"Tinywan"}
2、恭喜你!80@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
1、具體執(zhí)行任務(wù)接受到的參數(shù): {"email":"34@qq.com","username":"Tinywan"}
2、恭喜你!34@qq.com 郵件發(fā)送成功了
Processed: app\common\queue\TestQueue
1、命令行模式可以常駐內(nèi)存不停的執(zhí)行php代碼。這樣就可以達(dá)到類似于靜態(tài)語(yǔ)言的java的效果。
2、一開(kāi)始監(jiān)聽(tīng)隊(duì)列。剛剛在隊(duì)列中堆積的消息立刻就被獲取到,開(kāi)始執(zhí)行了代碼。最后執(zhí)行完成,刪除了消息。
3、在 queue:work--daemon 單進(jìn)程循環(huán)消費(fèi)的時(shí)候,改了代碼是不會(huì)生效的。這時(shí)腳本語(yǔ)言有點(diǎn)類似于靜態(tài)語(yǔ)言在執(zhí)行。所以需要我們用queue:restart重啟 work 進(jìn)程 。
命令行掛起守護(hù)進(jìn)程執(zhí)行
/usr/local/php/bin/php /data/wwwroot/default/thinkphp_5/think queue:work --daemon --queue testQueue --memory 256
查看進(jìn)程是否在運(yùn)行
# ps
PID USER TIME COMMAND
1 root 0:00 php-fpm: master process (/usr/local/etc/php-fpm.conf)
6 www-data 0:00 php-fpm: pool www
7 www-data 0:00 php-fpm: pool www
16 root 0:00 sh
56 root 0:00 sh
113 root 0:00 php think queue:work --daemon --queue testQueue
你再也不用守在終端了,以后只負(fù)責(zé)生產(chǎn)消息就可以了。Redis隊(duì)列也不會(huì)積累消息了
其他(中間件)
中間件系統(tǒng)的定義是兩個(gè)獨(dú)立的不同的系統(tǒng)在中間構(gòu)建起傳遞消息的工具。但是同一個(gè)系統(tǒng)也可以通過(guò)中間件來(lái)榨取性能,大家肯定項(xiàng)目中遇到過(guò)性能瓶頸。
比如發(fā)送郵件,發(fā)送短信,轉(zhuǎn)換視頻格式等等。這些業(yè)務(wù)都是比較耗性能,又對(duì)先后順序不敏感的業(yè)務(wù)。這種業(yè)務(wù)就非常適合使用消息隊(duì)列系統(tǒng)來(lái)異步處理,使性能提升。
重啟隊(duì)列和生成隊(duì)列
總結(jié)
以上是生活随笔為你收集整理的tp5 mysql实现消息队列_TP5系列 | Queue消息队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: z3735f android x86,英
- 下一篇: python读取dicom文件的包_py