RocketMQ單機支持1萬以上的持久化隊列,前提是足夠的內存、硬盤空間,過期數據數據刪除(RocketMQ中的消息隊列長度不是無限的,只是足夠大的內存+數據定時刪除)
RocketMQ版本:3.1.4
?
一,部署NameServer:
1,安裝JDK并設置JAVA_HOME環境變量(啟動腳本依賴JAVA_HOME環境變量)
2,cd /alibaba-rocketmq/bin進入RocketMQ的bin目錄
2,調用nohup sh mqnamesrv &啟動NameServer
報錯如下:
[plain]?view plain
?copy ? :?command?not?found?? :?command?not?found?? mqnamesrv:?line?35:?syntax?error:?unexpected?end?of?file??
在bin目錄下調用dos2unix *將所有文件轉化為unix格式,再次調用nohup sh mqnamesrv &
報錯如下:
[plain]?view plain
?copy ? /home/hadoop/alibaba-rocketmq?? Invalid?initial?heap?size:?-Xms4g?? The?specified?size?exceeds?the?maximum?representable?size.?? Could?not?create?the?Java?virtual?machine.??
由于安裝的JDK版本為32位,4g超過了JDK所支持的最大內存,不過32位JDK也無法發揮出RocketMQ的優勢,換成64位JDK
這次啟動成功
[plain]?view plain
?copy ? [hadoop@hadoop?bin]$?nohup?sh?mqnamesrv?&?? [1]?17676?? [hadoop@hadoop?bin]$?nohup:?appending?output?to?“nohup.out”?? ?? [hadoop@hadoop?bin]$?cat?nohup.out??? The?Name?Server?boot?success.?? [hadoop@hadoop?bin]$?jps?? 17682?NamesrvStartup?? 17800?Jps??
NameServer監聽端口:9876
[java]?view plain
?copy ? nettyServerConfig.setListenPort(9876);??
如果服務器內存不夠,可以修改runserver.sh腳本(mqnamesrv文件中通過runserver.sh腳本調用Name Server的主函數com.alibaba.rocketmq.namesrv.NamesrvStartup啟動Name Server)中的JAVA_OPT_1參數
[plain]?view plain
?copy ? JAVA_OPT_1="-server?-Xms4g?-Xmx4g?-Xmn2g?-XX:PermSize=128m?-XX:MaxPermSize=320m"??
二,部署Broker:消息中轉角色,負責存儲消息,轉發消息
Broker集群有多種配置方式:
1,單Master
??? 優點:除了配置簡單沒什么優點
??? 缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用
2,多Master
??? 優點:配置簡單,性能最高
??? 缺點:可能會有少量消息丟失(配置相關),單臺機器重啟或宕機期間,該機器下未被消費的消息在機器恢復前不可訂閱,影響消息實時性
3,多Master多Slave,每個Master配一個Slave,有多對Master-Slave,集群采用異步復制方式,主備有短暫消息延遲,毫秒級
??? 優點:性能同多Master幾乎一樣,實時性高,主備間切換對應用透明,不需人工干預
??? 缺點:Master宕機或磁盤損壞時會有少量消息丟失
4,多Master多Slave,每個Master配一個Slave,有多對Master-Slave,集群采用同步雙寫方式,主備都寫成功,向應用返回成功
??? 優點:服務可用性與數據可用性非常高
??? 缺點:性能比異步集群略低,當前版本主宕備不能自動切換為主
Master和Slave的配置文件參考conf目錄下的配置文件
Master與Slave通過指定相同的brokerName參數來配對,Master的BrokerId必須是0,Slave的BrokerId必須是大于0的數
一個Master下面可以掛載多個Slave,同一Master下的多個Slave通過指定不同的BrokerId來區分
部署一Master一Slave,集群采用異步復制方式:
Master:
[plain]?view plain
?copy ? [hadoop@hadoop?bin]$?nohup?sh?mqbroker?-n?"192.168.58.163:9876"?-c?../conf/2m-2s-async/broker-a.properties?&?? [2]?25493?? [hadoop@hadoop?bin]$?nohup:?appending?output?to?“nohup.out”?? ?? [hadoop@hadoop?bin]$?cat?nohup.out??? load?config?properties?file?OK,?../conf/2m-2s-async/broker-a.properties?? The?broker[broker-a,?192.168.58.163:10911]?boot?success.?and?name?server?is?192.168.58.163:9876?? [hadoop@hadoop?bin]$?jps?? 25500?BrokerStartup?? 25545?Jps?? 17682?NamesrvStartup??
Slave:
[plain]?view plain
?copy ? [hadoop@hadoop?bin]$?nohup?sh?mqbroker?-n?"192.168.58.163:9876"?-c?../conf/2m-2s-async/broker-a-s.properties?&?? [1]?1974?? [hadoop@hadoop?bin]$?nohup:?appending?output?to?“nohup.out”?? ?? [hadoop@hadoop?bin]$?cat?nohup.out??? load?config?properties?file?OK,?../conf/2m-2s-async/broker-a-s.properties?? The?broker[broker-a,?192.168.58.164:10911]?boot?success.?and?name?server?is?192.168.58.163:9876?? [hadoop@hadoop?bin]$?jps?? 2071?Jps?? 1981?BrokerStartup??
Broker監聽端口:10911
[java]?view plain
?copy ? nettyServerConfig.setListenPort(10911);??
如果服務器內存不夠,可以修改runbroker.sh腳本(mqbroker文件中通過runbroker.sh腳本調用Broker的主函數com.alibaba.rocketmq.broker.BrokerStartup啟動Broker)的JAVA_OPT_1參數
[plain]?view plain
?copy ? JAVA_OPT_1="-server?-Xms4g?-Xmx4g?-Xmn2g?-XX:PermSize=128m?-XX:MaxPermSize=320m"??
三,Producer
必須要設置Name Server地址
[java]?view plain
?copy ? package?com.sean;?? ?? import?com.alibaba.rocketmq.client.producer.DefaultMQProducer;?? import?com.alibaba.rocketmq.client.producer.SendResult;?? import?com.alibaba.rocketmq.common.message.Message;?? ?? public?class?Producer?{?? ????public?static?void?main(String[]?args){?? ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("Producer");?? ????????producer.setNamesrvAddr("192.168.58.163:9876");??? ????????try?{?? ????????????producer.start();?? ?????????????? ????????????Message?msg?=?new?Message("PushTopic",??? ????????????????????"push",??? ????????????????????"1",??? ????????????????????"Just?for?test.".getBytes());?? ?????????????? ????????????SendResult?result?=?producer.send(msg);?? ????????????System.out.println("id:"?+?result.getMsgId()?+?? ????????????????????"?result:"?+?result.getSendStatus());?? ?????????????? ????????????msg?=?new?Message("PushTopic",??? ????????????????????"push",??? ????????????????????"2",??? ????????????????????"Just?for?test.".getBytes());?? ?????????????? ????????????result?=?producer.send(msg);?? ????????????System.out.println("id:"?+?result.getMsgId()?+?? ????????????????????"?result:"?+?result.getSendStatus());?? ?????????????? ????????????msg?=?new?Message("PullTopic",??? ????????????????????"pull",??? ????????????????????"1",??? ????????????????????"Just?for?test.".getBytes());?? ?????????????? ????????????result?=?producer.send(msg);?? ????????????System.out.println("id:"?+?result.getMsgId()?+?? ????????????????????"?result:"?+?result.getSendStatus());?? ????????}?catch?(Exception?e)?{?? ????????????e.printStackTrace();?? ????????}finally{?? ????????????producer.shutdown();?? ????????}?? ????}?? }??
四,Consumer
必須要設置Name Server地址
[java]?view plain
?copy ? package?com.sean;?? ?? import?java.util.List;?? ?? import?com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;?? import?com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;?? import?com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;?? import?com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;?? import?com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;?? import?com.alibaba.rocketmq.common.message.Message;?? import?com.alibaba.rocketmq.common.message.MessageExt;?? ?? public?class?Consumer?{?? ????public?static?void?main(String[]?args){?? ????????DefaultMQPushConsumer?consumer?=??? ????????????????new?DefaultMQPushConsumer("PushConsumer");?? ????????consumer.setNamesrvAddr("192.168.58.163:9876");??? ????????try?{?? ?????????????? ????????????consumer.subscribe("PushTopic",?"push");?? ?????????????? ????????????consumer.setConsumeFromWhere(?? ????????????????????ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);?? ????????????consumer.registerMessageListener(?? ????????????????new?MessageListenerConcurrently()?{?? ????????????????????public?ConsumeConcurrentlyStatus?consumeMessage(?? ????????????????????????????List<MessageExt>?list,?? ????????????????????????????ConsumeConcurrentlyContext?Context)?{?? ????????????????????????Message?msg?=?list.get(0);?? ????????????????????????System.out.println(msg.toString());?? ????????????????????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;?? ????????????????????}?? ????????????????}?? ????????????);?? ????????????consumer.start();?? ????????}?catch?(Exception?e)?{?? ????????????e.printStackTrace();?? ????????}?? ????}?? }??
先運行Consumer,然后運行Producer
Producer運行結果:
[plain]?view plain
?copy ? id:C0A83AA300002A9F00000000000009EA?result:SEND_OK?? id:C0A83AA300002A9F0000000000000A77?result:SEND_OK?? id:C0A83AA300002A9F0000000000000B04?result:SEND_OK??
Consumer運行結果:
MessageExt [queueId=1, storeSize=141, queueOffset=6, sysFlag=0, bornTimestamp=1403765668792, bornHost=/192.168.31.130:60985, storeTimestamp=1403765527374, storeHost=/192.168.58.163:10911, msgId=C0A83AA300002A9F0000000000000A77, commitLogOffset=2679, bodyCRC=753746584, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={TAGS=push, KEYS=2, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=14]]
MessageExt [queueId=0, storeSize=141, queueOffset=6, sysFlag=0, bornTimestamp=1403765668698, bornHost=/192.168.31.130:60985, storeTimestamp=1403765527356, storeHost=/192.168.58.163:10911, msgId=C0A83AA300002A9F00000000000009EA, commitLogOffset=2538, bodyCRC=753746584, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={TAGS=push, KEYS=1, WAIT=true, MAX_OFFSET=7, MIN_OFFSET=0}, body=14]]
總結
以上是生活随笔為你收集整理的阿里RocketMQ Quick Start的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。