rabbit MQ 的环境及命令使用(一)
RabbitMQ依賴erlang,所以先安裝erlang,然后再安裝RabbitMQ;
先安裝erlang,雙擊erlang的安裝文件即可,然后配置環境變量:
ERLANG_HOME=D:\Program Files\erl7.1
追加到path=%ERLANG_HOME%\bin;
驗證erlang是否安裝成功, 打開cmd命令窗口,進入erlang的bin路徑,輸入erl命令,如果出現如下提示,則說明erlang安裝成功:
D:\Program Files\erl7.1\bin>erl
Eshell V7.1 (abort with ^G)
再安裝RabbitMQ,雙擊安裝文件即可,安裝完畢后, 設置環境變量:
RABBITMQ_SERVER=D:\Program Files\RabbitMQ Server\rabbitmq_server-3.5.6
追加到path=%RABBITMQ_SERVER%\sbin;
驗證RabbitMQ是否安裝成功,在CMD命令窗口輸入:
C:\Windows\system32>rabbitmq-service
?
?
安裝好后,我們進入rabbitMQ安裝目錄下的sbin目錄,在目錄下shift+右鍵打開命令行? ?【必須要進行這一步,不然中途會發現使用不了,連接不上.使用版本rabbitmq-server-3.7.9.】
?
使用rabbitmq-plugins.bat enable rabbitmq_management開啟網頁管理界面,然后重啟rabbitMQ
遠程連接需要 添加用戶,可在網頁admin頁面添加,,添加之后一定要給權限,不給權限的話還是不行,,詳細見【客戶端-開始任務完成】
?
#添加用戶#rabbitmqctl add_vhost vh #rabbitmqctl add_user test test #rabbitmqctl set_user_tags test management #rabbitmqctl set_permissions -p vh test ".*" ".*" ".*" def start(self):disconnected = Truewhile disconnected:try:disconnected = Falseself.channel.start_consuming() # blocking callexcept pika.exceptions.ConnectionClosed: # when connection is lost, e.g. rabbitmq not runninglogging.error("Lost connection to rabbitmq service on manager")disconnected = Truetime.sleep(10) # reconnect timerlogging.info("Trying to reconnect...")self.connect()self.clear_message_queue() #could this make problems if the manager replies too fast?# 參照文檔 https://blog.csdn.net/csdn_am/article/details/79894662 View Code---報錯處理【長時間未調用斷開處理】---這里使用的是死循環處理,
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 1:安裝RabbitMQ需要先安裝Erlang語言開發包。下載地址 http://www.erlang.org/download.html 在win7下安裝Erlang最好默認安裝。設置環境變量ERLANG_HOME= C:\Program Files\erlx.x.x 添加到PATH %ERLANG_HOME%\bin;2:安裝RabbitMQ 下載地址 http://www.rabbitmq.com/download.html 安裝教程:http://www.rabbitmq.com/install-windows.html設置環境變量RABBITMQ_SERVER=C:\Program Files\rabbitmq_server-x.x.x。添加到PATH %RABBITMQ_SERVER%\sbin;找到環境變量中的path變量:雙擊path,在其后面增加:;%RABBITMQ_SERVER%\sbin (注意前面的分號),然后確定即可現在打開windows命令行(“cmd”),輸入rabbitmq-service如果出現如下所示提示,即表示環境變量配置成功。3:進入%RABBITMQ_SERVER%\sbin 目錄以管理員身份運行 rabbitmq-plugins.bat rabbitmq-plugins.bat enable rabbitmq_management安裝完成之后以管理員身份啟動 rabbitmq-service.bat rabbitmq-service.bat stop rabbitmq-service.bat install rabbitmq-service.bat start4:瀏覽器訪問localhost:55672 默認賬號:guest 密碼:guest5. Rabbit還自帶監控功能. cmd進到sbin目錄,鍵入rabbitmq-plugins enable rabbitmq_management啟用監控管理,然后重啟Rabbitmq服務器。 打開網址http://localhost:55672,用戶名和密碼都是guest。 6. 現在打開瀏覽器,輸入:http://localhost:15672/ ,如果出現以下頁面,則表示服務器配置成功。 默認用戶名為guest,密碼:guest如果沒有出現以上頁面,嘗試在windows命令行中輸入(以管理員方式運行): rabbitmq-plugins enable rabbitmq_management 然后運行下面的命令來安裝: rabbitmq-service stop rabbitmq-service install rabbitmq-service start
?
rabbitMQ 生產者工作模式import pika1 創建socketconnection = pika.BlockingConnextion(pika.ConnectionParameters('localhost'))2 聲明一個管道channel = connection.channel()3 聲明queuechannel.queue_declare(queue='hello',durable=True)durable:聲明是持久化的隊列,默認是隊列存在內在中的,服務崩了之后,是不會恢復的4 發消息channel.basic_publish(exchange='',routing_key='hello', # 隊列的名字body='hello word!' #消息內容properties=pika.BasicProperties(delivery_mode=2,))properties #消息持久化,主要語句 delivery_mode5 發送完畢,關閉隊列connection.close()消費者工作模式1 創建socketconnection = pika.BlockingConnextion(pika.ConnectionParameters('localhost'))2 聲明一個管道channel = connection.channel()3 聲明queuechannel.queue_declare(queue='hello')4 消費消息def callback(ch,method,properties,body):print(' x Received %r'%body)# ch 就是管道的內存對象地址# method 就是包含發送信息的列表,# properties # 消息處理完,需要手動跟服務端確認# ch.basic_ack(delivery_tag=method.delivery_tag) #加上下面這個相當于負載勻衡的權重值,處理慢的加上這個channel.basic_qos(prefetch_count=1)channel.basic_consume(callback, #如果收到消息調用處理,回調函數queue='hello',#隊列的名字 no_ack=True #消息處理完了,表示不確認,一般不加,處理完了,由客戶端來向服務端確認(a_1.basic_ack(delivery_tag=a_2.delivery_tag) ) ) 5 啟動就一直運行,沒有消息就阻塞 channel.start_consuming() exchange: 轉發器 fanout: 所有bind到此exchange的queue都可以接收消息 direct:通過routingkey和exchange決定的那個唯一的qu eue可以接收消息 topic:所有符合routingKey(此時可以是一個表達式)的r outingKey所bind的queue可以接收消息 headers:通過headers來決定把消息發給哪些queue import pika conn = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channll = conn.channel() channll.queue_declare('url',durable=True) for i in range(1,10):channll.publish(exchange='',routing_key='url',body='https://wh.lianjia.com/ershoufang/pg%s/'%i) conn.close() View Code--lianjia_ser----服務端-生成10頁__鏈家 # python 3.7 import pika import re import requests,pymysql credential_s = pika.PlainCredentials('root','liu') conn = pika.BlockingConnection(pika.ConnectionParameters(host='176.215.44.242',credentials=credential_s)) channel = conn.channel() channel.queue_declare('url',durable=True)class Lianjia(object):def __init__(self,static_url=''):self.headers = {'Host':'wh.lianjia.com','Referer': 'https://wh.lianjia.com/ershoufang/','Upgrade-Insecure-Requests': '1','User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36'}if not static_url:self.static_url='https://wh.lianjia.com/ershoufang/pg%s/'else:self.static_url = static_urlself.patten='data-sl="">.*?data-sl="">(.*?)</a>.*?data-log_index=".*?" data-el=".*?">(.*?)</a>(.*?)</div>.*?</span>.*?<a href=".*?" target="_blank">(.*?)</a></div>.*?<.*?</span>.*?<span>(.*?)</span>'self.conn = pymysql.connect(host='176.215.44.242', user='root', password="liu",database='test', port=3306, charset='utf8')self.cur = self.conn.cursor()def get_page(self):res = requests.get(url=self.static_url,headers=self.headers,proxies={'http':'http://58.53.128.83:3128'})if res.status_code==200:self.write_res(res.text)def write_res(self,text):for i in re.findall(self.patten,text):a_1 = i[0] # 標題a_2 = i[1] # 小區名a_3 = i[2] # 詳情a_4 = i[3] # 位置a_5 =i[4] #價格sql = 'insert into lianjia(b_t,x_q_1,x_q_2,w_z,j_g) values(%s,%s,%s,%s,%s)'self.cur.execute(sql, [a_1, a_2, a_3, a_4, a_5])self.conn.commit()def callback(a_1,a_2,a_3,body):print('接收到body',body)l = Lianjia(static_url=body)l.get_page()a_1.basic_ack(delivery_tag=a_2.delivery_tag)print(body,'完成,')channel.basic_consume(callback,queue='url') channel.start_consuming() View Code--lianjia_cli----客戶端-開始任務完成就開始下一條
?
fanout:廣播模式(其他地方一樣)生產者需要更改的地方:1 channel.exchange_declare(exchange='logs',type='fanout')#定義轉發器的名字,在消費端上需要bind 2 channel.basic_publish(exchange='logs',routing_key='',body='消息隨便來')消費者需要更改的地方:1 channel.exchange_declare(exchange='logs',type='fanout')2 result = channel.queue_declare(exclusive=True)#排他,唯一的,不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除3 queue_name = result.method.queue#然后使用queue_name來進行操作,然后需要綁定轉發器4 channel.queue_bind(exchange='logs',queue=queue_name)#綁定 direct :生產者:1 channel.exchange_declare(exchange='direct_logs',type='direct')2 severity = 'info'|'warning'|'error' #級別3 channel.basic_publish(exchange='logs',routing_key=severity,body='消息隨便來')消費者:1 severitys = ['info','warning','error']2 result = channel.queue_declare(exclusive=True)3 queue_name = result.method.queue4 for severity in severitys:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)#循環列表去綁定 topic: '#'是所有都收生產者:跟direct基本一樣,1 channel.exchange_declare(exchange='direct_logs',type='topic')消費者:跟direct基本一樣,1 需要更改exchange='topic'
先暫時寫到這里吧,下次更新其他模式
轉載于:https://www.cnblogs.com/Skyda/p/10018332.html
總結
以上是生活随笔為你收集整理的rabbit MQ 的环境及命令使用(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux(CentOS)挂载NTFS格
- 下一篇: VB与Java颜色值的转换