linux安装RabbitMQ和amqp扩展(这个安装rabbitmq通过了但是代码测试没有通过)
消息隊列rabbitmq
RabbitMQ是一個在AMQP基礎上完成的,可復用的企業(yè)消息系統(tǒng),底層基于Erlang語言。
一:centos7安裝RabbitMQ
這玩意兒安裝很扯淡,官方推薦rpm安裝,rpm安裝本身是最簡單的,但是安裝RabbitMQ卻不簡單,很可能需要修改倉庫地址。不同linux版本不一樣,centos6和centos7也不一樣。我這里不用rpm,手動編譯Erlang,然后選擇編譯好的RabbitMQ。
1:安裝Erlang
1):先安裝幾個必要的插件
$ yum -y install gcc glibc-devel make ncurses-devel openssl-devel autoconf unixODBC unixODBC-devel socat
2):Erlang下載地址:http://www.erlang.org/downloads,我這里下載21.1版本
$ wget http://erlang.org/download/otp_src_21.1.tar.gz #下載
$ tar -xvf otp_src_21.1.tar.gz #解壓
$ cd otp_src_21.1/ #進入目錄準備編譯
$ ./configure --prefix=/usr/local/erlang --without-javac #忽略java編譯
$ make #編譯
$ make install #安裝
?make & make install 這兩步很慢,巨慢無比,耐心等待。
3):進入我們安裝后的目錄測試一下是否安裝成功
?
$ /usr/local/erlang/bin/erl
安裝成功
二:安裝rabbitmq
1:下載地址:http://www.rabbitmq.com/download.html
2:因為我上面的Erlang是手動編譯的,所以這里不選擇rpm方式安裝,直接下載解壓包,從這里下載:https://github.com/rabbitmq/rabbitmq-server/releases
$ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz
$ tar xvJf rabbitmq-server-generic-unix-3.7.8.tar.xz
$ mv rabbitmq_server-3.7.8/ /usr/local/rabbitmq #解壓后移動到你想放到的目錄
這個是編譯好的,可以直接用。
?
3:設置環(huán)境變量,設置兩個,一個是Erlang,一個是rabbitmq,打開文件/etc/profile文件,在文件最后加入以下三行:
export ERLANG_PATH=$PATH:/usr/local/erlang/bin #erlang安裝目錄
export RABBITMQ_PATH=$PATH:/usr/local/rabbitmq/sbin #rabbitmq安裝目錄
export PATH=$PATH:$ERLANG_PATH:$RABBITMQ_PATH
運行命令生效:
source /etc/profile
啟動一下:
$ rabbitmq-server
成功
啟動web管理后臺:
$ rabbitmq-plugins enable rabbitmq_management #disable為關閉
開啟防火墻,打開15672端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload #重啟防火墻
web界面出來了
?
guest用戶被限制,只能通過127.0.0.1訪問,當然也可以修改配置文件開啟guest遠程訪問。這里我們新建一個用戶,并授予管理員權限。
用戶名:admin 密碼:123
$ rabbitmqctl add_user admin 123 #添加用戶
$ rabbitmqctl set_user_tags admin administrator #分配角色
登錄成功
相關命令:命令在/usrlocal/rabbitmq/sbin下
rabbitmq-server -detached #后臺啟動
rabbitmqctl stop #關閉服務
rabbitmqctl status #查看狀態(tài)
rabbitmqctl list_users #列出角色
三:安裝php擴展(我用的php7.2版本)
php是用amqp調用RabbitMQ,所以先下載ampq
$ wget https://pecl.php.net/get/amqp-1.9.3.tgz #下載
$ tar -xvf amqp-1.9.3.tgz #解壓
$ cd amqp-1.9.3
$ /usr/local/php/bin/phpize #用phpize生成編譯文件,注意查看你的php在哪里
$ ./configure --with-php-config=/usr/local/php/bin/php-config
到這一步,我這里報錯了:checking for amqp using pkg-config... configure: error: librabbitmq not found
?
這個錯誤提示還要安裝一個破玩意:rabbitmq-c
去這里下載:https://github.com/alanxz/rabbitmq-c/releases
$ wget https://github.com/alanxz/rabbitmq-c/archive/v0.9.0.tar.gz
$ tar -xvf v0.9.0.tar.gz
$ cd rabbitmq-c-0.9.0/
準備configure的時候,發(fā)現(xiàn)沒有configure,0.9改成cmake了,靠,安裝一下cmake
$ yum -y install cmake
$ cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c-0.9.0 #指定安裝目錄
$ make
$ make install
然后回過頭去再編譯amqp-1.9.3
$ ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.9.0
$ make
$ make install
媽的,make報錯!
發(fā)現(xiàn)點蛛絲馬跡,上面進入了/usr/local/rabbitmq-c-0.9.0/lib 目錄,查看一下發(fā)現(xiàn)/usr/local/rabbitmq-c-0.9.0/沒有l(wèi)ib,但有個lib64位。
處理一下:
$ cp -R /usr/local/rabbitmq-c-0.9.0/lib64/ /usr/local/rabbitmq-c-0.9.0/lib
接著干:make && make install ,OK,這也太不智能了吧。
加入到php.ini 查看一下!
?
extension=amqp.so?
?
?
擴展安裝成功,這時候就可以用PHP操作RabbitMQ了。
四:一些基本術語參數(shù)
1:Message:消息。包括消息頭和消息體
2:Publisher:生產者。發(fā)布消息的一方
3:Consumer:消費者。接受消息的一方
4:Connection:網(wǎng)絡鏈接,TCP鏈接
5:Channel:信道。建立在真實的TCP連接內地虛擬連接,所有命令通過信道輸入輸出,多路復用一條 TCP 連接,降低TCP開銷
6:ExChange:交換區(qū)。因為RabbitMQ是基于AMQP的,AMQP協(xié)議中的核心思想就是生產者和消費者隔離,也就是生產者不直接把消息發(fā)到隊列,而是先發(fā)給ExChange(交換區(qū)),Exchange按照特定的策略轉發(fā)到Queue(隊列中)進行存儲,所以ExChange的作用就是負責轉發(fā),生產者只面向 Exchange 發(fā)布消息,消費者只面向 Queue 消費消息。
Direct:將消息中的Routing key與該Exchange關聯(lián)的所有Binding中的Routing key進行比較,完全相等,則發(fā)送到該Binding對應的Queue中。
Topic:模糊匹配,如果匹配上了,則發(fā)送到該Binding對應的Queue中。
* 表示可以匹配零個或多個字符(Routing key是user.# user.a user.b user都可以匹配)
# 表示可以匹配一個字符 (Routing key是user.* user.a user.b 可以匹配;user user.a.c 不可以匹配)
Fanout:直接將消息轉發(fā)到所有binding的對應queue中,忽略Routing key。
Headers Exchange:將消息中的headers與該Exchange相關聯(lián)的所有Binging中的參數(shù)進行匹配,如果匹配上了,則發(fā)送到該Binding對應的Queue中。
7:Binding:綁定。建立交換區(qū)與消息隊列之間的關聯(lián),也就是設置規(guī)則,交換區(qū)該發(fā)送到哪個隊列。
8:Queue:消息隊列容器,存儲消息隊列的地方。durability表示是否持久化,durable表示是,transient表示否。
9:Vhost:翻譯叫什么虛擬主機,其實就類似于mysql的數(shù)據(jù)庫,一個意思。
五:一些基本命令
$ rabbitmq-server -detached #后臺啟動
$ rabbitmqctl stop [] #停止RabbitMQ服務,同時關閉erlang節(jié)點和應用程序
$ rabbitmqctl status #查看狀態(tài)
$ rabbitmqctl stop [] ?
$ rabbitmqctl stop_app ?#停止RabbitMQ服務,僅關閉erlang節(jié)點上的rabbit應用程序
$ rabbitmqctl start_app ? #啟動erlan node上的rabbitmq的應用
? ?
#用戶管理
$ rabbitmqctl list_users #列出角色
$ rabbitmqctl add_user admin 123 #添加用戶和密碼 這里用戶名:admin 密碼:123
$ rabbitmqctl set_user_tags admin administrator #分配角色
$ rabbitmqctl change_password username newpassword #修改用戶密碼
$ rabbitmqctl delete_user username #刪除用戶
# vhost(Virtual host)管理,這玩意兒相當于mysql的數(shù)據(jù)庫
$ rabbitmqctl add_vhost {name} #添加
$ rabbitmqctl delete_vhost {name} #刪除
$ rabbitmqctl list_vhosts {name} #查看所有
六:php操作RabbitMQ
1:實踐:用PHP創(chuàng)建交換區(qū):goods_msm,隊列名稱:goods_worker,以及路由key:code1
<?php
$conn = new AMQPConnection([
????'host' => '127.0.0.1',
????'vhost' => '/',
????'port' => 5672,
????'login' => 'guest',
????'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
??die('connetc error');
}
$channel = new AMQPChannel($conn); //創(chuàng)建channel(信道或者叫通道)
?
?
$ExChangeName = 'goods_msm'; //交換區(qū)名稱
$queueName = 'goods_worker'; //隊列名稱
$routeName1 = 'code1'; //路由key?
?
?
//創(chuàng)建交換機對象?????
$exChange = new AMQPExchange($channel);????
$exChange->setName($ExChangeName);?
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
echo "Exchange Status:".$exChange->declare()."\n";???//查看如果交換機不存在則進行創(chuàng)建
運行,如果沒有報錯的話會輸出:
?
Exchange Status:1
通過web窗口看一下
交換區(qū)建立成功
繼續(xù):
//創(chuàng)建隊列 ? ?
$queue = new AMQPQueue($channel);?
$queue->setName($queueName); ??
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
echo "Message Total:".$queue->declare()."\n"; ? //查看,如果不存在則創(chuàng)建
可以看到隊列創(chuàng)建成功
?
繼續(xù)往下走:
//綁定交換區(qū)與隊列,指定路由鍵?
//rabbitmq不是直接發(fā)送到隊列,發(fā)送到交換區(qū),由交換區(qū)決定發(fā)給某個隊列
echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //綁定路由
$conn->disconnect(); //關閉連接
可以看到綁定成功
?
也可以用命令查看:
這樣就創(chuàng)建成功了,下面繼續(xù)用生產者,消費者干起來。最終代碼:
<?php
/**
?* 用PHP創(chuàng)建交換區(qū):goods_msm,隊列名稱:goods_worker,以及路由key:code1
?* 此代碼不是生產者,也不是消費者
**/
$conn = new AMQPConnection([
? ? 'host' => '127.0.0.1',
? ? 'vhost' => '/',
? ? 'port' => 5672,
? ? 'login' => 'guest',
? ? 'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
? die('connetc error');
}
$channel = new AMQPChannel($conn); //創(chuàng)建channel(信道或者叫通道)
?
$ExChangeName = 'goods_msm'; //交換區(qū)名稱
$queueName = 'goods_worker'; //隊列名稱
$routeName1 = 'code1'; //路由key?
?
//創(chuàng)建交換機對象 ? ??
$exChange = new AMQPExchange($channel); ? ?
$exChange->setName($ExChangeName);?
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
echo "Exchange Status:".$exChange->declare()."\n"; ? //查看如果交換機不存在則進行創(chuàng)建
?
//創(chuàng)建隊列 ? ?
$queue = new AMQPQueue($channel);?
$queue->setName($queueName); ??
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
echo "Message Total:".$queue->declare()."\n"; ? //查看,如果不存在則創(chuàng)建
?
?
//綁定交換區(qū)與隊列,指定路由鍵?
//rabbitmq不是直接發(fā)送到隊列,發(fā)送到交換區(qū),由交換區(qū)決定發(fā)給某個隊列
echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //綁定路由
$conn->disconnect(); //關閉連接
2:生產者
<?php
/**
?* 生產者
?* 生產者也就是發(fā)送方
**/
$conn = new AMQPConnection([
? ? 'host' => '127.0.0.1',
? ? 'vhost' => '/',
? ? 'port' => 5672,
? ? 'login' => 'guest',
? ? 'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
? die('connetc error');
}
$channel = new AMQPChannel($conn); //創(chuàng)建channel(信道或者叫通道)
?
$ExChangeName = 'goods_msm'; //交換區(qū)名稱
$routeName1 = 'code1'; //路由key?
?
//創(chuàng)建交換機對象 ??
$exChange = new AMQPExchange($channel); ? ?
$exChange->setName($ExChangeName);?
?
$exChange->publish('第一條測試消息', $routeName1); //發(fā)送消息
$conn->disconnect(); //關閉連接
?運行一下,到web管理界面看看
?
消息發(fā)送成功
3:消費者
<?php
/**
?* 用PHP創(chuàng)建交換區(qū):goods_msm,隊列名稱:goods_worker,以及路由key:code1
?* 此代碼不是生產者,也不是消費者
**/
$conn = new AMQPConnection([
????'host' => '127.0.0.1',
????'vhost' => '/',
????'port' => 5672,
????'login' => 'guest',
????'password' => 'guest'
]);
if(!$conn->connect()){//建立連接
??die('connetc error');
}
$channel = new AMQPChannel($conn); //創(chuàng)建channel(信道或者叫通道)
?
$queueName = 'goods_worker'; //隊列名稱
?
//創(chuàng)建隊列????
$queue = new AMQPQueue($channel);?
$queue->setName($queueName);???
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
?
?
//接受消息
$queue->consume(function ($envelope, $queue) {
????$msg = $envelope->getBody();
????echo $msg."\n"; //處理消息
}, AMQP_AUTOACK); //自動應答
?
$conn->disconnect(); //關閉連接
注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用于取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環(huán)中使用;后者則是非阻塞的,取消息時有則取,無則返回false。
就是說用了consume之后,會同步阻塞,該程序常駐內存,不能用nginx,apache調用。?
運行一下生產者,消費端輸出:
?
7:實戰(zhàn)
實戰(zhàn)1:延時隊列
(1)需求:電子商務網(wǎng)站下單后,30分鐘如果未付款,改變訂單狀態(tài)
(2)原理:rabbitmq沒有直接延時隊列的功能,不過可以通過模擬實現(xiàn)此功能。用過redis的都知道,redis有一個ttl功能,就是生存周期,某個key設置生存周期,過期就會消失,rabbitmq也有此功能。rabbitmq消息過期進入死信隊列,然后配置一個轉發(fā),把死信隊列的消息轉發(fā)到某個隊列,這樣就可以操作這個隊列。
(3)步驟:
a:創(chuàng)建訂單的時候,同時發(fā)送訂單到消息隊列,并且設置過期時間
b:mq回調,也就是改變訂單狀態(tài)服務,判斷30分鐘后是否付款,如果未付款,改變訂單狀態(tài)為:無效訂單。
MQ也要分兩步,首先是進入死信隊列,但是死信隊列不能直接消費,需要轉發(fā)出來,利用MQ的兩個特性:
a:ttl,生存周期
b:Dead Letter Exchanges(DLX)
這里設置兩個地方用于轉發(fā):
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送
具體參考MQ官方文檔:http://www.rabbitmq.com/dlx.html
demo:
<?php
/**
1:創(chuàng)建第1個隊列,此隊列為下單的時候放入隊列,然后設置過期時間
2:創(chuàng)建第2個隊列,第1個隊列過期后自動轉入該隊列
3:所以處理N久時間過期,只要處理第2個隊列就行了。
**/
$conn = new AMQPConnection([
? ? 'host' => '127.0.0.1',
? ? 'vhost' => '/',
? ? 'port' => 5672,
? ? 'login' => 'guest',
? ? 'password' => 'guest'
]);
?
$params = [
?? ?//訂單最開始在這里 第1個隊列
? ? 'exchangeName' => 'old_order_exchange', //交換區(qū)名稱
? ? 'queueName' => 'old_order_msg', //訂單隊列
? ? 'routeKey' => 'old_order_route', //路由key?
?
? ? //過期后轉發(fā)到這里 第2個隊列
? ? 'exchangeName2' => 'new_order_exchange', //交換區(qū)名稱
? ? 'queueName2' => 'new_order_msg', //訂單隊列
? ? 'routeKey2' => 'new_order_route', //路由key?
];
?
if(!$conn->connect()){//建立連接
? die('connetc error');
}
$channel = new AMQPChannel($conn); //創(chuàng)建channel(信道或者叫通道)
?
//創(chuàng)建交換機對象 第1個隊列
$exChange = new AMQPExchange($channel); ? ?
$exChange->setName($params['exchangeName']);?
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
$exChange->declare();
?
//創(chuàng)建交換機對象 第2個隊列
$exChange2 = new AMQPExchange($channel); ? ?
$exChange2->setName($params['exchangeName2']);
$exChange2->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$exChange2->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重啟時交換機自動恢復
$exChange2->declare();
?
//第1個隊列
$queue = new AMQPQueue($channel);?
$queue->setName($params['queueName']); ??
$queue->setFlags(AMQP_DURABLE); //隊列持久化?
$queue->setArguments([
?? ?//如果過期,把訂單轉發(fā)到以下
?? ?'x-dead-letter-exchange' => $params['exchangeName2'],
?? ?'x-dead-letter-routing-key' => $params['routeKey2'],
?? ?'x-message-ttl' => 10000, //60秒過期,注意:message是單個消息過期,mq也可以設置整個隊列過期時間
]);
$queue->declare();
?
//第2個隊列 ? ?
$queue2 = new AMQPQueue($channel);?
$queue2->setName($params['queueName2']); ??
$queue2->setFlags(AMQP_DURABLE); //隊列持久化?
$queue2->declare();
?
//綁定交換區(qū)與隊列,指定路由鍵 第1個隊列
$queue->bind($params['exchangeName'], $params['routeKey']); //綁定訂單定時隊列
?
//綁定交換區(qū)與隊列,指定路由鍵 第2個隊列
$queue2->bind($params['exchangeName2'], $params['routeKey2']); //綁定過期轉發(fā)訂單隊列
?
echo $exChange->publish('訂單'.date('Y-m-d H:i:s'), $params['routeKey']); //發(fā)送消息,往第1個隊列發(fā)
?
$conn->disconnect(); //關閉連接
?運行下這個程序:
這里有1條消息
過期后轉到這里來了!
---------------------?
作者:一曲微茫度此生?
來源:CSDN?
原文:https://blog.csdn.net/weixin_41782053/article/details/84992609?
版權聲明:本文為博主原創(chuàng)文章,轉載請附上博文鏈接!
總結
以上是生活随笔為你收集整理的linux安装RabbitMQ和amqp扩展(这个安装rabbitmq通过了但是代码测试没有通过)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: -bash : xxx : comman
- 下一篇: 瑞字取名禁忌有哪些?