使用Hazelcast发布和订阅
幾周前,我寫了一篇有關Hazelcast入門的博客,描述了創建分布式地圖,列表和隊列是多么簡單。 當時我提到Hazelcast還能做很多其他事情。 該博客快速瀏覽了Hazelcast的另一個功能:基于Publish / Subscribe模式的廣播消息系統。 這采用通常的格式,即郵件發件人應用通過該格式發布有關特定主題的郵件。 這些消息并不針對任何特定的客戶端,而是可以由對主題感興趣的任何客戶端讀取。
發布和訂閱的明顯場景來自高金融和做市商的世界。 做市商買賣股票之類的金融工具,并通過在通常是電子的市場上宣傳買賣價格來競爭業務。 為了使用Hazelcast實現非常簡單的做市商方案,我們需要三個類: StockPrice bean, MarketMaker和Client 。
以下代碼已添加到我在Github上可用的現有Hazelcast項目中。 無需擔心其他POM依賴項。
public class StockPrice implements Serializable { private static final long serialVersionUID = 1L; private final BigDecimal bid; private final BigDecimal ask; private final String code; private final String description; private final long timestamp; /** * Create a StockPrice for the given stock at a given moment */ public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description, long timestamp) { super(); this.bid = bid; this.ask = ask; this.code = code; this.description = description; this.timestamp = timestamp; } public BigDecimal getBid() { return bid; } public BigDecimal getAsk() { return ask; } public String getCode() { return code; } public String getDescription() { return description; } public long getTimestamp() { return timestamp; } @Override public String toString() { StringBuilder sb = new StringBuilder("Stock - "); sb.append(code); sb.append(" - "); sb.append(description); sb.append(" - "); sb.append(description); sb.append(" - Bid: "); sb.append(bid); sb.append(" - Ask: "); sb.append(ask); sb.append(" - "); SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS"); sb.append(df.format(new Date(timestamp))); return sb.toString(); } }StockPrice bean具有所有常用的獲取器和設置器,可以在任何給定時間模擬股票的買價和買價(以正常語言進行買賣),并且MarketMaker類使用Hazelcast發布這些bean。
通常,做市商會在一種以上的金融工具中發布價格; 但是,為簡單起見, MarketMaker在此演示中僅發布單個價格。
public class MarketMaker implements Runnable { private static Random random = new Random(); private final String stockCode; private final String description; private final ITopic<StockPrice> topic; private volatile boolean running; public MarketMaker(String topicName, String stockCode, String description) { this.stockCode = stockCode; this.description = description; this.topic = createTopic(topicName); running = true; } @VisibleForTesting ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); } public void publishPrices() { Thread thread = new Thread(this); thread.start(); } @Override public void run() { do { publish(); sleep(); } while (running); } private void publish() { StockPrice price = createStockPrice(); System.out.println(price.toString()); topic.publish(price); } @VisibleForTesting StockPrice createStockPrice() { double price = createPrice(); DecimalFormat df = new DecimalFormat("#.##"); BigDecimal bid = new BigDecimal(df.format(price - variance(price))); BigDecimal ask = new BigDecimal(df.format(price + variance(price))); StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis()); return stockPrice; } private double createPrice() { int val = random.nextInt(2010 - 1520) + 1520; double retVal = (double) val / 100; return retVal; } private double variance(double price) { return (price * 0.01); } private void sleep() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } public void stop() { running = false; } public static void main(String[] args) throws InterruptedException { MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom"); MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys"); MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium"); bt.publishPrices(); cbry.publishPrices(); bp.publishPrices(); } }像往常一樣,設置Hazelcast相當簡單,上面MarketMaker類中的大多數代碼與Hazelcast無關。 該課程分為兩部分:建筑價格和出版價格。 構造函數采用三個參數,將其存儲起來以備后用。 它還創建一個Hazelcast實例,并通過私有createTopic()方法注冊一個名為"STOCKS"的簡單主題。 如您所料,創建Hazelcast實例并注冊主題需要兩行代碼,如下所示:
ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); }該類的其余部分使用線程來調用MarketMaker的run()方法來運行價格發布機制。 此方法生成隨機出價,為關聯的股票代碼要價,并使用Hazelcast發布。 使用以下單行代碼即可完成發布:
topic.publish(price);MarketMaker類的最后一部分是main()方法,其作用是創建多個MarketMaker實例并使它們運行。
現在,Hazelcast知道了我們不斷變化的股價,接下來要做的就是整理客戶代碼。
public class Client implements MessageListener<StockPrice> { public Client(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this); } /** * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message) */ @Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); } public static void main(String[] args) { new Client("STOCKS"); } }與任何消息傳遞系統一樣,消息發送者代碼必須知道呼叫誰和呼叫什么。 客戶端通過創建Hazelcast實例并在"STOCKS"主題中注冊興趣來實現“調用什么"STOCKS" ,方法與發布者相同,如下所示:
HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this);客戶端實現Hazelcast的MessageListener接口及其單一方法onMessage()實現“呼叫”
@Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); }客戶端代碼的最后一部分是其main()方法,該方法創建一個客戶端實例。
最后要做的是運行代碼。 為此,我僅將所有必需的JAR文件放在一個目錄中,只需考慮兩個:hazel cast-3.1.jar和guava-13.0.1.jar。
完成后,我轉到項目的classes目錄:
cd /Users/Roger/git/captaindebug/hazelcast/target/classes…并解雇了發布者
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker……然后是客戶。
java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client 當然,如果您正在使用此粗略且已準備好的技術在計算機上運行此程序,則請記住將其替換
/Users/Roger/tmp/mm以及放置這些JAR文件副本的路徑。
如果您在一個終端中運行MarketMaker發布者,并在其他兩個終端中運行幾個客戶,那么您將得到類似的信息,在這里您可以看到正在發布的價格以及客戶正在接收更新。
關于Hazelcast的一件事要注意的是,“ 集群 ”是指Hazelcast實例的集群,而不是JVM的集群。 在您為每個應用程序請求多個Hazelcast實例之前,這是不明顯的。 當其他客戶端加入集群時,您將看到類似以下內容:
Members [5] { Member [192.168.0.7]:5701 Member [192.168.0.7]:5702 Member [192.168.0.7]:5703 Member [192.168.0.7]:5704 this Member [192.168.0.7]:5705 }在上面的日志中,有兩個偵聽器條目,每個偵聽器條目一個,每個發布者條目三個,在MarketMaker的main()方法中啟動的每個MarketMaker實例一個。
這里要考慮的事情是,是否每個對象實例創建一個Hazelcast實例是一種好習慣(就像我在示例代碼中所做的那樣),還是在代碼中有一個static Hazelcast實例更好。 我不確定該答案是什么,因此,如果有任何Hazelcast專家正在閱讀此書,請告訴我。
就是這樣:Hazelcast可以在發布和訂閱模式下愉快地運行,但是我還沒有介紹Hazelcast的所有功能。 也許以后再說……
- 可以在Github上找到此源代碼: https : //github.com/roghughe/captaindebug/tree/master/hazelcast
翻譯自: https://www.javacodegeeks.com/2014/01/publish-and-subscribe-with-hazelcast.html
總結
以上是生活随笔為你收集整理的使用Hazelcast发布和订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 垃圾收集器准则和提示
- 下一篇: linux可以用来干嘛(linux什么用