javascript
SpringBoot整合分布式消息平台Pulsar
作為優秀的消息流平臺,Pulsar 的使用越來越多,這篇文章講解 Pulsar 的 Java 客戶端。
部署 Pulsar
Pulsar 的部署方式主要有 3 種,本地安裝二進制文件、docker 部署、在 Kubernetes 上部署。
本文采用 docker 部署一個單節點的 Pulsar 集群。實驗環境是 2 核 CPU 和 4G 內存。
部署命令如下:
docker?run?-it?-p?6650:6650??-p?8080:8080?--mount?source=pulsardata,target=/pulsar/data?--mount?source=pulsarconf,target=/pulsar/conf?apachepulsar/pulsar:2.9.1?bin/pulsar?standalone安裝過程可能會出現下面的錯誤:
unknown?flag:?--mount See?'docker?run?--help'.這是因為 docker 版本低,不支持 mount 參數,把 docker 版本升級到 17.06 以上就可以了。
部署過程中可能會因為網絡的原因失敗,多試幾次就可以成功了。如果看到下面的日志,就說明啟動成功了。
2022-01-08T22:27:58,726+0000?[main]?INFO??org.apache.pulsar.broker.PulsarService?-?messaging?service?is?ready,?bootstrap?service?port?=?8080,?broker?url=?pulsar://localhost:6650,?cluster=standalone本地單節點集群啟動后,會創建一個 namespace,名字叫 public/default
Pulsar 客戶端
目前 Pulsar 支持多種語言的客戶端,包括:
Java 客戶端
Go 客戶端
Python 客戶端
C++ 客戶端
Node.js 客戶端
WebSocket 客戶端
C# 客戶端
SpringBoot 配置
使用 SpringBoot 整合 Pulsar 客戶端,首先引入 Pulsar 客戶端依賴,代碼如下:
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.9.1</version> </dependency>然后在 properties 文件中添加配置:
# Pulsar 地址 pulsar.url=pulsar://192.168.59.155:6650 # topic pulsar.topic=testTopic # consumer group pulsar.subscription=topicGroup創建 Client
創建客戶端非常簡單,代碼如下:
client?=?PulsarClient.builder().serviceUrl(url).build();上面的 url 就是 properties 文件中定義的 pulsar.url 。
創建 Client 時,即使集群沒有啟成功,程序也不會報錯,因為這時還沒有真正地去連接集群。
創建 Producer
producer?=?client.newProducer().topic(topic).compressionType(CompressionType.LZ4).sendTimeout(0,?TimeUnit.SECONDS).enableBatching(true).batchingMaxPublishDelay(10,?TimeUnit.MILLISECONDS).batchingMaxMessages(1000).maxPendingMessages(1000).blockIfQueueFull(true).roundRobinRouterBatchingPartitionSwitchFrequency(10).batcherBuilder(BatcherBuilder.DEFAULT).create();創建 Producer,會真正的連接集群,這時如果集群有問題,就會報連接錯誤。
下面解釋一下創建 Producer 的參數:
topic:Producer 要寫入的 topic。
compressionType:壓縮策略,目前支持 4 種策略 (NONE、LZ4、ZLIB、ZSTD),從 Pulsar2.3 開始,只有 Consumer 的版本在 2.3 以上,這個策略才會生效。
sendTimeout:超時時間,如果 Producer 在超時時間為收到 ACK,會進行重新發送。
enableBatching:是否開啟消息批量處理,這里默認 true,這個參數只有在異步發送 (sendAsync) 時才能生效,選擇同步發送會失效。
batchingMaxPublishDelay:批量發送消息的時間段,這里定義的是 10ms,需要注意的是,設置了批量時間,就不會受消息數量的影響。批量發送會把要發送的批量消息放在一個網絡包里發送出去,減少網絡 IO 次數,大大提高網卡的發送效率。
batchingMaxMessages:批量發送消息的最大數量。
maxPendingMessages:等待從 broker 接收 ACK 的消息隊列最大長度。如果這個隊列滿了,producer 所有的 sendAsync 和 send 都會失敗,除非設置了 blockIfQueueFull 值是 true。
blockIfQueueFull:Producer 發送消息時會把消息先放入本地 Queue 緩存,如果緩存滿了,就會阻塞消息發送。
roundRobinRouterBatchingPartition-SwitchFrequency:如果發送消息時沒有指定 key,那默認采用 round robin 的方式發送消息,使用 round robin 的方式,切換 partition 的周期是 (frequency * batchingMaxPublishDelay)。
創建 Consumer
Pulsar 的消費模型如下圖:
從圖中可以看到,Consumer 要綁定一個 subscription 才能進行消費。
consumer?=?client.newConsumer().topic(topic).subscriptionName(subscription).subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).negativeAckRedeliveryDelay(60,?TimeUnit.SECONDS).receiverQueueSize(1000).subscribe();下面解釋一下創建 Consumer 的參數:
topic:Consumer 要訂閱的 topic。
subscriptionName:consumer 要關聯的 subscription 名字。
subscriptionType:訂閱類型,Pulsar 支持四種類型訂閱:
Exclusive:獨占模式,同一個 Topic 只能有一個消費者,如果多個消費者,就會出錯。
Failover:災備模式,同一個 Topic 可以有多個消費者,但是只能有一個消費者消費,其他消費者作為故障轉移備用,如果當前消費者出了故障,就從備用消費者中選擇一個進行消費。如下圖:
Shared:共享模式,同一個 Topic 可以由多個消費者訂閱和消費。消息通過 round robin 輪詢機制分發給不同的消費者,并且每個消息僅會被分發給一個消費者。當消費者斷開,如果發送給它消息沒有被消費,這些消息會被重新分發給其它存活的消費者。如下圖:
Key_Shared:消息和消費者都會綁定一個key,消息只會發送給綁定同一個key的消費者。如果有新消費者建立連接或者有消費者斷開連接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好處是既可以讓消費者并發地消費消息,又能保證同一Key下的消息順序。如下圖:
subscriptionInitialPosition:創建新的 subscription 時從哪里開始消費,有兩個選項:
Latest:從最新的消息開始消費
Earliest:從最早的消息開始消費
negativeAckRedeliveryDelay:消費失敗后間隔多久 broker 重新發送。
receiverQueueSize:在調用 receive 方法之前,最多能累積多少條消息。可以設置為 0,這樣每次只從 broker 拉取一條消息。在 Shared 模式下,receiverQueueSize 設置為 0,可以防止批量消息多發給一個 Consumer 而導致其他 Consumer 空閑。
Consumer 接收消息有四種方式:同步單條、同步批量、異步單條和異步批量,代碼如下:
Message?message?=?consumer.receive() CompletableFuture<Message>?message?=?consumer.receiveAsync(); Messages?message?=?consumer.batchReceive(); CompletableFuture<Messages>?message?=?consumer.batchReceiveAsync();對于批量接收,也可以設置批量接收的策略,代碼如下:
consumer?=?client.newConsumer().topic(topic).subscriptionName(subscription).batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(100).maxNumBytes(1024?*?1024).timeout(200,?TimeUnit.MILLISECONDS).build()).subscribe();代碼中的參數說明如下:
maxNumMessages:批量接收的最大消息數量。
maxNumBytes:批量接收消息的大小,這里是 1MB。
測試
首先編寫 Producer 發送消息的代碼,如下:
public?void?sendMsg(String?key,?String?data)?{CompletableFuture<MessageId>?future?=?producer.newMessage().key(key).value(data.getBytes()).sendAsync();future.handle((v,?ex)?->?{if?(ex?==?null)?{logger.info("發送消息成功,?key:{},?msg:?{}",?key,?data);}?else?{logger.error("發送消息失敗,?key:{},?msg:?{}",?key,?data);}return?null;});future.join();logger.info("發送消息完成,?key:{},?msg:?{}",?key,?data); }然后編寫一個 Consumer 消費消息的代碼,如下:
public?void?start()?throws?Exception{while?(true)?{Message?message?=?consumer.receive();String?key?=?message.getKey();String?data?=?new?String(message.getData());String?topic?=?message.getTopicName();if?(StringUtils.isNotEmpty(data))?{try{logger.info("收到消息,?topic:{},?key:{},?data:{}",?topic,?key,?data);}catch(Exception?e){logger.error("接收消息異常,topic:{},?key:{},?data:{}",?topic,?key,?data,?e);}}consumer.acknowledge(message);} }最后編寫一個 Controller 類,調用 Producer 發送消息,代碼如下:
@RequestMapping("/send") @ResponseBody public?String?send(@RequestParam?String?key,?@RequestParam?String?data)?{logger.info("收到消息發送請求,?key:{},?value:{}",?key,?data);pulsarProducer.sendMsg(key,?data);return?"success"; }調用 Producer 發送一條消息,key=key1,data=data1,具體操作為在瀏覽器中輸入下面的 url 后回車:
http://192.168.157.1:8083/pulsar/send?key=key1&data=data1可以看到控制臺輸出下面日志:
2022-01-08?22:42:33,199?[pulsar-client-io-6-1]?[INFO]?boot.pulsar.PulsarProducer?-?發送消息成功,?key:key1,?msg:?data1 2022-01-08?22:42:33,200?[http-nio-8083-exec-1]?[INFO]?boot.pulsar.PulsarProducer?-?發送消息完成,?key:key1,?msg:?data1 2022-01-08?22:42:33,232?[Thread-22]?[INFO]?boot.pulsar.PulsarConsumer?-?收到消息,?topic:persistent://public/default/testTopic,?key:key1,?data:data1 2022-01-08?22:43:14,498?[pulsar-timer-5-1]?[INFO]?org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl?-?[testTopic]?[topicGroup]?[7def6]?Prefetched?messages:?0?---?Consume?throughput?received:?0.02?msgs/s?---?0.00?Mbit/s?---?Ack?sent?rate:?0.02?ack/s?---?Failed?messages:?0?---?batch?messages:?0?---Failed?acks:?0 2022-01-08?22:43:14,961?[pulsar-timer-9-1]?[INFO]?org.apache.pulsar.client.impl.ProducerStatsRecorderImpl?-?[testTopic]?[standalone-9-0]?Pending?messages:?0?---?Publish?throughput:?0.02?msg/s?---?0.00?Mbit/s?---?Latency:?med:?69.000?ms?-?95pct:?69.000?ms?-?99pct:?69.000?ms?-?99.9pct:?69.000?ms?-?max:?69.000?ms?---?Ack?received?rate:?0.02?ack/s?---?Failed?messages:?0從日志中看到,這里使用的 namespace 就是創建集群時生成的public/default。
總結
從 SpringBoot 整合 Java 客戶端使用來看,Pulsar 的 api 是非常友好的,使用起來方便簡潔。Consumer 的使用需要考慮多一些,需要考慮到批量、異步以及訂閱類型。
總結
以上是生活随笔為你收集整理的SpringBoot整合分布式消息平台Pulsar的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 太卷了~ 八股文,算法张口就来?2022
- 下一篇: 实战~阿里神器 Seata 实现 TCC