metaq发送和接收消息demo
生活随笔
收集整理的這篇文章主要介紹了
metaq发送和接收消息demo
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為什么80%的碼農都做不了架構師?>>> ??
一、maven依賴
<dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client</artifactId><version>1.4.6.2</version> </dependency><dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client-extension</artifactId><version>1.4.6.2</version> </dependency>二、發送者
import?com.taobao.metamorphosis.Message; import?com.taobao.metamorphosis.client.MessageSessionFactory; import?com.taobao.metamorphosis.client.MetaClientConfig; import?com.taobao.metamorphosis.client.MetaMessageSessionFactory; import?com.taobao.metamorphosis.client.producer.MessageProducer; import?com.taobao.metamorphosis.client.producer.SendResult; import?com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import?java.io.BufferedReader; import?java.io.InputStreamReader;/***?Created?by?lc-t123?on?2016/4/14.*/ public?class?Producer?{public?static?void?main(String[]?args)?throws?Exception?{final?MetaClientConfig?metaClientConfig?=?new?MetaClientConfig();final?ZKConfig?zkConfig?=?new?ZKConfig();//設置zookeeper地址zkConfig.zkConnect?=?"192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);//?New?session?factory,強烈建議使用單例MessageSessionFactory?sessionFactory?=?new?MetaMessageSessionFactory(metaClientConfig);/**??create?producer,強烈建議使用單例*??消息生產者的接口是MessageProducer,你可以通過它來發送消息*/MessageProducer?producer?=?sessionFactory.createProducer();//?publish?topicfinal?String?topic?=?"test";/**?這一步在發送消息前是必須的,你必須發布你將要發送消息的topic*?這是為了讓會話工廠幫你去查找接收這些topic的meta服務器地址并初始化連接*?這個步驟針對每個topic只需要做一次,多次調用無影響*/producer.publish(topic);BufferedReader?reader?=?new?BufferedReader(new?InputStreamReader(System.in));String?line?=?null;while?((line?=?reader.readLine())?!=?null){/**?send?message*?在Meta里,每個消息對象都是Message類的實例,Message表示一個消息對象,它包含這么幾個屬性:*?1)?id:?Long型的消息id,消息的唯一id,系統自動產生,用戶無法設置,在發送成功后由服務器返回,發送失敗則為0。*?2)?topic:?消息的主題,訂閱者訂閱該主題即可接收發送到該主題下的消息,生產者通過指定發布的topic查找到需要連接的服務器地址,必須。*?3)?data:?消息的有效載荷,二進制數據,也就是消息內容,meta永遠不會修改消息內容,你發送出去是什么樣子,接收到就是什么樣子。消息內容通常限制在1M以內,我的建議是最好不要發送超過上百K的消息,必須。數據是否壓縮也完全取決于用戶。*?4)?attribute:?消息屬性,一個字符串,可選。發送者可設置消息屬性來讓消費者過濾。*/SendResult?sendResult?=?producer.sendMessage(new?Message(topic,?line.getBytes()));//?check?resultif?(!sendResult.isSuccess()){System.err.println("Send?message?failed,error?message:"?+?sendResult.getErrorMessage());}else?{System.out.println("Send?message?successfully,sent?to?"?+?sendResult.getPartition());}}} }三、接收者
import?com.taobao.metamorphosis.Message; import?com.taobao.metamorphosis.client.MessageSessionFactory; import?com.taobao.metamorphosis.client.MetaClientConfig; import?com.taobao.metamorphosis.client.MetaMessageSessionFactory; import?com.taobao.metamorphosis.client.consumer.ConsumerConfig; import?com.taobao.metamorphosis.client.consumer.MessageConsumer; import?com.taobao.metamorphosis.client.consumer.MessageListener; import?com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import?java.util.concurrent.Executor;public?class?AsyncConsumer?{public?static?void?main(String[]?args)?throws?Exception?{final?MetaClientConfig?metaClientConfig?=?new?MetaClientConfig();final?ZKConfig?zkConfig?=?new?ZKConfig();//設置zookeeper地址zkConfig.zkConnect?=?"192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);//?New?session?factory,強烈建議使用單例MessageSessionFactory?sessionFactory?=?new?MetaMessageSessionFactory(metaClientConfig);//?subscribed?topicfinal?String?topic?=?"test";//?consumer?groupfinal?String?group?=?"meta-example";/**?create?consumer,強烈建議使用單例*?通過createConsumer方法來創建MessageConsumer,注意到我們傳入一個ConsumerConfig參數,*?這是消費者的配置對象。每個消息者都必須有一個ConsumerConfig配置對象,*?我們這里只設置了group屬性,這是消費者的分組名稱。*?Meta的Producer、Consumer和Broker都可以為集群。*?消費者可以組成一個集群共同消費同一個topic,發往這個topic的消息將按照一定的負載均衡規則發送給集群里的一臺機器。*?同一個消費者集群必須擁有同一個分組名稱,也就是同一個group。我們這里將分組名稱設置為meta-example*/MessageConsumer?consumer?=?sessionFactory.createConsumer(new?ConsumerConfig(group));/**?subscribe?topic*?訂閱消息通過subscribe方法,這個方法接受三個參數*?1)?topic,訂閱的主題*?2)?maxSize,因為meta是一個消費者主動拉取的模型,這個參數規定每次拉取的最大數據量,單位為字節,這里設置為1M,默認最大為1M。*?3)?MessageListener,消息監聽器,負責消息消息。*/consumer.subscribe(topic,?1024?*?1024,?new?MessageListener()?{public?void?recieveMessages(Message?message)?{System.out.println("Receive?message?"?+?new?String(message.getData()));}public?Executor?getExecutor()?{//?Thread?pool?to?process?messages,maybe?null.return?null;}});//?complete?subscribeconsumer.completeSubscribe();} }metaq-server安裝參考官方文檔
可以通過http://192.168.1.70:8120/?訪問web界面
轉載于:https://my.oschina.net/chaun/blog/659479
總結
以上是生活随笔為你收集整理的metaq发送和接收消息demo的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux shell script 的
- 下一篇: 交错数组