RabbiqMQ快速入门
RabbitMQ
官網(wǎng)地址: https://www.rabbitmq.com/
一個(gè)遵循AMQP協(xié)議,開源面向消息的中間件,支持多種編程語言。
Rabbitmq 能做什么?
- 邏輯解耦,異步的消息任務(wù)
- 消息持久化,重啟不影響
- 削峰,大規(guī)模的消息處理
主要的特點(diǎn)
可靠性:持久化,傳輸確認(rèn),發(fā)布確認(rèn) 可擴(kuò)展性:多個(gè)節(jié)點(diǎn)可以組成一個(gè)集群,可動(dòng)態(tài)更改 多語言:支持多數(shù)編程語言 管理界面:有常見的用戶界面,便于管理和監(jiān)控常見的應(yīng)用場(chǎng)景:
并發(fā)請(qǐng)求的壓力高可用設(shè)計(jì)(電商秒殺場(chǎng)景), 異步任務(wù)處理結(jié)果的回調(diào)設(shè)計(jì)(日志訂單異步處理), 系統(tǒng)集成與分布式系統(tǒng)設(shè)計(jì)(各種子系統(tǒng)的消息同步)。工作原理
簡(jiǎn)單介紹生產(chǎn)者和消費(fèi)者會(huì)和服務(wù)器建立tcp鏈接,在tcp鏈接之上會(huì)建立多個(gè)信道channel,通過信道來發(fā)送消息,生產(chǎn)者生產(chǎn)消息后不直接直接發(fā)到隊(duì)列中,而是發(fā)到一個(gè)交換空間:Exchange, Exchange會(huì)根據(jù)Exchange類型和Routing Key來決定發(fā)到哪個(gè)隊(duì)列中,消費(fèi)者在從隊(duì)列中拿到消息具體工作模式
名詞解釋
ExChange :消息交換機(jī),決定消息按照什么規(guī)則路由到那個(gè)對(duì)列中去 Queue :消息載體,每個(gè)消息都會(huì)被投到一個(gè)或多個(gè)隊(duì)列 Binding:綁定,把exchange 和 queue按照路由規(guī)則綁定起來 Routing Key: 路由關(guān)鍵字,exchage根據(jù)這關(guān)鍵字來投遞消息 Channel :消息通道,客戶端的每個(gè)連接建立多個(gè)channel Producer :消息生產(chǎn)者,用戶投遞消息的程序 Consumer :消息消費(fèi)者,用于就是接收消息的程序Exchage工作模式
Fanout: 類似廣播,轉(zhuǎn)發(fā)到所有綁定交換機(jī)的Queue Direct: 類似單播,RoutingKey 和 BindingKey完全匹配 Topic : 類似組播,轉(zhuǎn)發(fā)到符合通配符的Queue headers:請(qǐng)求頭與消息頭匹配,才能接收到消息環(huán)境配置
通過docker環(huán)境配置
# /www/rabbitmq目錄可自定義,主要用于目錄掛載 mkdir -p /www/rabbitmq # 創(chuàng)建容器 docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p15672:15672 -v /www/rabbitmq:/var/lib/rabbitmq rabbitmq:management # 查看容器狀態(tài) docker ps | grep rabbit瀏覽器打開登錄rabbitmq, 入口:http://localhost:15672 默認(rèn)用戶名: guest 密碼: guestgolang實(shí)戰(zhàn)
簡(jiǎn)單基本玩法
//下載類庫(kù) go get "github.com/streadway/amqp"前期準(zhǔn)備代碼
//連接信息 const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc"//rabbitMQ結(jié)構(gòu)體 type RabbitMQ struct {conn *amqp.Connectionchannel *amqp.Channel//隊(duì)列名稱QueueName string//交換機(jī)名稱Exchange string//bind Key 名稱Key string//連接信息Mqurl string }//創(chuàng)建結(jié)構(gòu)體實(shí)例 func NewRabbitMQ(queueName string,exchange string ,key string) *RabbitMQ {return &RabbitMQ{QueueName:queueName,Exchange:exchange,Key:key,Mqurl:MQURL} }//斷開channel 和 connection func (r *RabbitMQ) Destory() {r.channel.Close()r.conn.Close() } //錯(cuò)誤處理函數(shù) func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))} }簡(jiǎn)單模式
簡(jiǎn)單模式下 Exchange 和 key是為空的,不需要設(shè)置
//創(chuàng)建簡(jiǎn)單模式下RabbitMQ實(shí)例 func NewRabbitMQSimple(queueName string) *RabbitMQ {//創(chuàng)建RabbitMQ實(shí)例rabbitmq := NewRabbitMQ(queueName,"","")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed to connect rabb"+"itmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//簡(jiǎn)單模式隊(duì)列生產(chǎn) func (r *RabbitMQ) PublishSimple(message string) {//1.申請(qǐng)隊(duì)列,如果隊(duì)列不存在會(huì)自動(dòng)創(chuàng)建,存在則跳過創(chuàng)建_, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動(dòng)刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//調(diào)用channel 發(fā)送消息到隊(duì)列中r.channel.Publish(r.Exchange,r.QueueName,//如果為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊(duì)列會(huì)把消息返還給發(fā)送者false,//如果為true,當(dāng)exchange發(fā)送消息到隊(duì)列后發(fā)現(xiàn)隊(duì)列上沒有消費(fèi)者,則會(huì)把消息返還給發(fā)送者false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//simple 模式下消費(fèi)者 func (r *RabbitMQ) ConsumeSimple() {//1.申請(qǐng)隊(duì)列,如果隊(duì)列不存在會(huì)自動(dòng)創(chuàng)建,存在則跳過創(chuàng)建q, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動(dòng)刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//接收消息msgs, err :=r.channel.Consume(q.Name, // queue//用來區(qū)分多個(gè)消費(fèi)者"", // consumer//是否自動(dòng)應(yīng)答true, // auto-ack//是否獨(dú)有false, // exclusive//設(shè)置為true,表示 不能將同一個(gè)Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個(gè)Connection中 的消費(fèi)者false, // no-local//列是否阻塞false, // no-waitnil, // args)if err != nil {fmt.Println(err)}forever := make(chan bool)//啟用協(xié)程處理消息go func() {for d := range msgs {//消息邏輯處理,可以自行設(shè)計(jì)邏輯log.Printf("Received a message: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever}工作模式
一個(gè)消息只能被一個(gè)消費(fèi)者獲取(場(chǎng)景:生產(chǎn)消息大于消費(fèi)消息的時(shí)候),更簡(jiǎn)單模式代碼一樣,只是同事開啟了多個(gè)消費(fèi)端,起到負(fù)載均衡的作用
訂閱模式
該模式下,隊(duì)列為空,key為空;只需設(shè)置交換空間即可;消息被投遞到多個(gè)隊(duì)列中,一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)
//訂閱模式創(chuàng)建RabbitMQ實(shí)例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {//創(chuàng)建RabbitMQ實(shí)例rabbitmq := NewRabbitMQ("",exchangeName,"")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//訂閱模式生產(chǎn) func (r *RabbitMQ) PublishPub(message string) {//1.嘗試創(chuàng)建交換機(jī)err := r.channel.ExchangeDeclare(r.Exchange,"fanout",true,false,//true表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發(fā)送消息err = r.channel.Publish(r.Exchange,"",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//訂閱模式消費(fèi)端代碼 func (r *RabbitMQ) RecieveSub() {//1.試探性創(chuàng)建交換機(jī)err := r.channel.ExchangeDeclare(r.Exchange,//交換機(jī)類型"fanout",true,false,//YES表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創(chuàng)建隊(duì)列,這里注意隊(duì)列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機(jī)生產(chǎn)隊(duì)列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊(duì)列到 exchange 中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,這里的key要為空"",r.Exchange,false,nil)//消費(fèi)消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請(qǐng)按 CTRL+C\n")<-forever }路由模式
在路由模式下,一個(gè)消息可以被多個(gè)消費(fèi)者獲取,該模式生產(chǎn)端可以指定消費(fèi)端;交換機(jī)的類型需要設(shè)置為direct,并且需要設(shè)置bind key。
/路由模式 //創(chuàng)建RabbitMQ實(shí)例 func NewRabbitMQRouting(exchangeName string,routingKey string) *RabbitMQ {//創(chuàng)建RabbitMQ實(shí)例rabbitmq := NewRabbitMQ("",exchangeName,routingKey)var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//路由模式發(fā)送消息 func (r *RabbitMQ) PublishRouting(message string ) {//1.嘗試創(chuàng)建交換機(jī)err := r.channel.ExchangeDeclare(r.Exchange,//要改成direct"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發(fā)送消息err = r.channel.Publish(r.Exchange,//要設(shè)置r.Key,false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) } //路由模式接受消息 func (r *RabbitMQ) RecieveRouting() {//1.試探性創(chuàng)建交換機(jī)err := r.channel.ExchangeDeclare(r.Exchange,//交換機(jī)類型"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創(chuàng)建隊(duì)列,這里注意隊(duì)列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機(jī)生產(chǎn)隊(duì)列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊(duì)列到 exchange 中err = r.channel.QueueBind(q.Name,//需要綁定keyr.Key,r.Exchange,false,nil)//消費(fèi)消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請(qǐng)按 CTRL+C\n")<-forever }Topic模式,話題模式
一個(gè)消息可以被多個(gè)消費(fèi)者獲取,消息的目標(biāo)queue可用BindingKey以通配符,的方式指定。
交換的類型設(shè)置為 topic,在接受端通過匹配規(guī)則匹配(例如:hello.*.world)
參考: https://www.cnblogs.com/luotianshuai/p/7469365.html#4199652
轉(zhuǎn)載于:https://www.cnblogs.com/nirao/p/11176137.html
總結(jié)
以上是生活随笔為你收集整理的RabbiqMQ快速入门的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL中字符串函数详细介绍
- 下一篇: 正史上汉昭帝刘弗陵是个怎样的人(汉昭帝生