java生产者实现kafka拦截器
生活随笔
收集整理的這篇文章主要介紹了
java生产者实现kafka拦截器
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
【RAEDME】
本文中, java客戶端作為生產者, centos中consumer線程作為消費者;
?
【1】攔截器簡述
1)攔截器是什么? 很明顯,為了實現面向切面編碼,即在 具體邏輯的上下文?添加一些邏輯;如
邏輯1 具體邏輯 邏輯22)什么時候調用攔截器?這就要從 kafka生產者發送數據說起了;
kafka生產者使用了2個線程來發送數據: step1)生產者中的main線程把數據經過 攔截器-》序列化器-》分區器 處理;然后再把數據寫到 RecordAccumulator; step2)send 線程從 RecordAccumulator 中取出數據寫入到broker list;【2】攔截器實現
/* 添加攔截器 */ props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));1)需求
第1個攔截器, 在消息發送前將時間戳加到消息value的 最前面; 第2個攔截器,在消息發送后更新成功發送消息數或失敗發送消息數;2)代碼實現
-- 添加攔截器
/* 添加攔截器 */ props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName()));-- 帶有攔截器的生產者
/*** 帶有攔截器的生產者*/ public class InterceptorProducer {public static void main(String[] args) {/* 1.創建kafka生產者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*3.ack應答級別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次發送多少數據,當數據大于16k,生產者會發送數據到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待時間, 等待時間超過1毫秒,即便數據沒有大于16k, 也會寫數據到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 緩沖區大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化類 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 添加攔截器 */props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName(), CounterInterceptor.class.getName())); System.out.println(props); /* 9.創建生產者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.發送數據 */ for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("first", "key"+i, "value-first-20210101--J" + i));}/* 11.關閉資源 */ producer.close(); // 間接調用了攔截器的close 方法 System.out.println("kafka生產者寫入數據完成"); } } /*** 時間攔截器-在消息前添加時間戳 */ public class TimeInterceptor implements ProducerInterceptor<String, String>{@Overridepublic void configure(Map<String, ?> configs) {}/*生產者確保在 消息被序列化以及計算分區前調用該方法。*/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 獲取消息值 String value = record.value();return new ProducerRecord<>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + record.value()); }/*該方法會在消息從 RecordAccumulator 成功發送到Kafka Broker之后,或者在發送過程中失敗時調用; 且通常在 生產者回調邏輯觸發之前調用*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {} } /*** 計數攔截器*/ public class CounterInterceptor implements ProducerInterceptor<String, String>{int sucCounter = 0;int errCounter = 0;@Overridepublic void configure(Map<String, ?> configs) {}/*生產者確保在 消息被序列化以及計算分區前調用該方法。*/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record; }/*該方法會在消息從 RecordAccumulator 成功發送到Kafka Broker之后,或者在發送過程中失敗時調用; 且通常在 生產者回調邏輯觸發之前調用*/@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if(metadata !=null ) {sucCounter++;} else {errCounter++; }}@Overridepublic void close() {System.out.println("sucCounter =" + sucCounter + ", errCounter=" + errCounter); } }-- java生產者打印日志?
sucCounter =10, errCounter=0 kafka生產者寫入數據完成3)centos消費者消費結果
[root@centos201 ~]# kafka-console-consumer.sh --topic first --zookeeper centos201:2181 Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. 1609599630884,value-first-20210102--A0 1609599631203,value-first-20210102--A1 1609599631204,value-first-20210102--A6 1609599631203,value-first-20210102--A2 1609599631203,value-first-20210102--A3 1609599631204,value-first-20210102--A4 1609599631204,value-first-20210102--A5 1609599631204,value-first-20210102--A7 1609599631204,value-first-20210102--A8 1609599631204,value-first-20210102--A9?
?
?
總結
以上是生活随笔為你收集整理的java生产者实现kafka拦截器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 洁白的近义词是什么 一起来看看吧
- 下一篇: 累累硕果什么意思 词语累累硕果什么意思