hazelcast 使用_使用Hazelcast发布和订阅
hazelcast 使用
幾周前,我寫了一篇有關(guān)Hazelcast入門的博客,描述了創(chuàng)建分布式地圖,列表和隊列是多么簡單。 當(dāng)時我提到,Hazelcast還做很多其他事情。 該博客快速介紹了Hazelcast的另一個功能:基于Publish / Subscribe模式的廣播消息系統(tǒng)。 這采用通常的格式,即郵件發(fā)件人應(yīng)用通過該格式發(fā)布有關(guān)特定主題的郵件。 消息不針對任何特定的客戶端,但是可以由對主題感興趣的任何客戶端讀取。
發(fā)布和訂閱的明顯場景來自高金融和做市商的世界。 做市商買賣股票之類的金融工具,并通過在通常是電子市場中的買賣價格廣告來競爭業(yè)務(wù)。 為了使用Hazelcast實現(xiàn)非常簡單的做市商方案,我們需要三個類: StockPrice bean, MarketMaker和Client 。
以下代碼已添加到我在Github上可用的現(xiàn)有Hazelcast項目中。 無需擔(dān)心其他POM依賴項。
public class StockPrice implements Serializable { private static final long serialVersionUID = 1L; private final BigDecimal bid; private final BigDecimal ask; private final String code; private final String description; private final long timestamp; /** * Create a StockPrice for the given stock at a given moment */ public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description, long timestamp) { super(); this.bid = bid; this.ask = ask; this.code = code; this.description = description; this.timestamp = timestamp; } public BigDecimal getBid() { return bid; } public BigDecimal getAsk() { return ask; } public String getCode() { return code; } public String getDescription() { return description; } public long getTimestamp() { return timestamp; } @Override public String toString() { StringBuilder sb = new StringBuilder("Stock - "); sb.append(code); sb.append(" - "); sb.append(description); sb.append(" - "); sb.append(description); sb.append(" - Bid: "); sb.append(bid); sb.append(" - Ask: "); sb.append(ask); sb.append(" - "); SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS"); sb.append(df.format(new Date(timestamp))); return sb.toString(); } }StockPrice bean具有所有常用的獲取器和設(shè)置器,可以在任何給定時間模擬股票的買價和買價(以正常語言進行買賣),并且MarketMaker類使用Hazelcast發(fā)布這些bean。
通常,做市商會在一種以上的金融工具中發(fā)布價格; 但是,為簡單起見, MarketMaker在此演示中僅發(fā)布單個價格。
public class MarketMaker implements Runnable { private static Random random = new Random(); private final String stockCode; private final String description; private final ITopic<StockPrice> topic; private volatile boolean running; public MarketMaker(String topicName, String stockCode, String description) { this.stockCode = stockCode; this.description = description; this.topic = createTopic(topicName); running = true; } @VisibleForTesting ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); } public void publishPrices() { Thread thread = new Thread(this); thread.start(); } @Override public void run() { do { publish(); sleep(); } while (running); } private void publish() { StockPrice price = createStockPrice(); System.out.println(price.toString()); topic.publish(price); } @VisibleForTesting StockPrice createStockPrice() { double price = createPrice(); DecimalFormat df = new DecimalFormat("#.##"); BigDecimal bid = new BigDecimal(df.format(price - variance(price))); BigDecimal ask = new BigDecimal(df.format(price + variance(price))); StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis()); return stockPrice; } private double createPrice() { int val = random.nextInt(2010 - 1520) + 1520; double retVal = (double) val / 100; return retVal; } private double variance(double price) { return (price * 0.01); } private void sleep() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } public void stop() { running = false; } public static void main(String[] args) throws InterruptedException { MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom"); MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys"); MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium"); bt.publishPrices(); cbry.publishPrices(); bp.publishPrices(); } }像往常一樣,設(shè)置Hazelcast相當(dāng)簡單,上面MarketMaker類中的大多數(shù)代碼與Hazelcast無關(guān)。 該課程分為兩部分:建筑價格和出版價格。 構(gòu)造函數(shù)接受三個參數(shù),將其存儲起來以備后用。 它還創(chuàng)建一個Hazelcast實例,并通過私有createTopic()方法注冊一個名為"STOCKS"的簡單主題。 如您所料,創(chuàng)建Hazelcast實例并注冊主題需要兩行代碼,如下所示:
ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); }該類的其余部分使用線程來調(diào)用MarketMaker的run()方法來運行價格發(fā)布機制。 此方法生成隨機出價,為關(guān)聯(lián)的股票代碼要價,并使用Hazelcast發(fā)布。 使用以下單行代碼即可完成發(fā)布:
topic.publish(price);MarketMaker類的最后一部分是main()方法,其作用是創(chuàng)建多個MarketMaker實例并使它們運行。
既然Hazelcast知道了我們不斷變化的股票價格,接下來要做的就是整理客戶代碼。
public class Client implements MessageListener<StockPrice> { public Client(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this); } /** * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message) */ @Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); } public static void main(String[] args) { new Client("STOCKS"); } }與任何消息傳遞系統(tǒng)一樣,消息發(fā)送者代碼必須知道呼叫誰和呼叫什么。 客戶端通過創(chuàng)建Hazelcast實例并在"STOCKS"主題中注冊興趣來實現(xiàn)“調(diào)用什么"STOCKS" ,方法與發(fā)布者相同,如下所示:
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this);通過客戶端實現(xiàn)Hazelcast的MessageListener接口及其單一方法onMessage()來實現(xiàn)“呼叫”
@Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); }客戶端代碼的最后一部分是其main()方法,該方法創(chuàng)建一個客戶端實例。
最后要做的是運行代碼。 為此,我僅將所有必需的JAR文件放在一個目錄中,只需考慮兩個:hazel cast-3.1.jar和guava-13.0.1.jar。
完成后,我轉(zhuǎn)到項目的classes目錄:
cd /Users/Roger/git/captaindebug/hazelcast/target/classes…并解雇了發(fā)布者
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker……然后是客戶。
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client 當(dāng)然,如果您正在使用此粗略且準(zhǔn)備就緒的技術(shù)在計算機上運行此程序,則請記住將其替換
/Users/Roger/tmp/mm ,以及放置這些JAR文件副本的路徑。
如果您在一個終端中運行MarketMaker發(fā)布者,并在其他兩個終端中運行幾個客戶,那么您將得到類似的信息,在這里您可以看到正在發(fā)布的價格以及客戶正在接收更新。
關(guān)于Hazelcast的一件事要注意的是,“ 集群 ”是指Hazelcast實例的集群,而不是JVM的集群。 除非您為每個應(yīng)用程序請求多個Hazelcast實例,否則這是不明顯的。 當(dāng)其他客戶端加入群集時,您將看到類似以下內(nèi)容:
Members [5] { Member [192.168.0.7]:5701 Member [192.168.0.7]:5702 Member [192.168.0.7]:5703 Member [192.168.0.7]:5704 this Member [192.168.0.7]:5705 }在上面的日志中,有兩個偵聽器條目,每個偵聽器條目一個,每個發(fā)布者條目三個,在MarketMaker的main()方法中啟動的每個MarketMaker實例一個。
這里要考慮的事情是,是否是每個對象實例創(chuàng)建一個Hazelcast實例的好習(xí)慣(就像我在示例代碼中所做的那樣),還是在代碼中擁有一個static Hazelcast實例更好。 我不確定該答案是什么,所以如果有任何Hazelcast專家正在閱讀此書,請告訴我。
就是這樣:Hazelcast可以在發(fā)布和訂閱模式下愉快地運行,但是我還沒有介紹Hazelcast的所有功能。 也許以后再說……
- 可以在Github上獲得此源代碼: https : //github.com/roghughe/captaindebug/tree/master/hazelcast
翻譯自: https://www.javacodegeeks.com/2014/01/publish-and-subscribe-with-hazelcast.html
hazelcast 使用
總結(jié)
以上是生活随笔為你收集整理的hazelcast 使用_使用Hazelcast发布和订阅的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 雅马哈功放dsp怎么设置(雅马哈功放ds
- 下一篇: 笔记本电脑capslk(笔记本电脑cap