Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once
目錄
至少一次(at least once)
最多一次(at most once)
精確一次(exactly once)
冪等性
冪等性作用范圍
實(shí)現(xiàn)方法
代碼
事務(wù)
事務(wù)作用范圍
實(shí)現(xiàn)方法
代碼
我們知道Kafka的消息交付可靠性保障分為 最多一次(at most once),至少一次(at least once),精確一次(exactly once)
?
至少一次(at least once)
什么時(shí)候Producer數(shù)據(jù)會(huì)重復(fù)發(fā)送 呢?
比如當(dāng)Producer發(fā)送一條數(shù)據(jù),當(dāng)數(shù)據(jù)發(fā)送過(guò)去了,由于某種原因Broker沒(méi)有反饋給Producer已經(jīng)提交成功,Producer此時(shí)設(shè)置了重試機(jī)制,retries (設(shè)置方法:props.put(ProducerConfig.RETRIES_CONFIG, 5); ),則會(huì)再次發(fā)送數(shù)據(jù),此時(shí)會(huì)導(dǎo)致數(shù)據(jù)重復(fù)發(fā)送
最多一次(at most once)
與at least once 相反,我們把retries 禁止,則就是最多一次,如果禁止重試,會(huì)導(dǎo)致數(shù)據(jù)丟失
?
精確一次(exactly once)
如何實(shí)現(xiàn)精確一次呢
Producer 有兩種方法 冪等性與事務(wù)型
冪等性
冪等性作用范圍
只能保證單個(gè)Producer不會(huì)產(chǎn)生重復(fù)數(shù)據(jù),如果Producer重啟或者多個(gè)Producer無(wú)法保證數(shù)據(jù)不重復(fù)
實(shí)現(xiàn)方法
設(shè)置一下配置即可
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)代碼
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import wiki.hadoop.kafka.config.Constant;import java.util.Properties; import java.util.concurrent.ExecutionException;/*** 冪等性生產(chǎn)者** 它只能保證單分區(qū)上的冪等性,即一個(gè)冪等性 Producer 能夠保證某個(gè)主題的一個(gè) 分區(qū)上不出現(xiàn)重復(fù)消息,它無(wú)法實(shí)現(xiàn)多個(gè)分區(qū)的冪等性* 它只能實(shí)現(xiàn)單會(huì)話上的冪等性,不能實(shí)現(xiàn)跨會(huì)話的冪等性。這里的會(huì)話,你可以理 解為 Producer 進(jìn)程的一次運(yùn)行。當(dāng)你重啟了 Producer 進(jìn)程之后,這種冪等性保 證就喪失了* @author jast* @date 2020/4/19 22:38*/ public class IdempotenceProducer {private static Producer<String, String> producer ;public IdempotenceProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//設(shè)置Producer冪等性,其他不用變化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer = new IdempotenceProducer();Producer<String, String> producer = idempotenceProducer.getProducer();producer.send(new ProducerRecord<String,String>("test","1234")).get();}}?
?
事務(wù)
事務(wù)作用范圍
全部
實(shí)現(xiàn)方法
Producer設(shè)置//設(shè)置Producer冪等性,其他不用變化 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); //設(shè)置事務(wù),同時(shí)也要指定冪等性,自定義id名稱 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");-------------------------------------------------------------------Consumer設(shè)置//設(shè)置只讀事務(wù)提交成功后的數(shù)據(jù)props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());代碼
Producer
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import wiki.hadoop.kafka.config.Constant;import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** Kafka事務(wù)提交,保證exactly once producer* 要么全部成功,要么全部失敗* @author jast* @date 2020/4/21 22:38*/ public class TransactionProducer {private static Producer<String, String> producer ;public TransactionProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//設(shè)置Producer冪等性,其他不用變化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//設(shè)置事務(wù),同時(shí)也要指定冪等性,自定義id名稱props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer = new TransactionProducer();Producer<String, String> producer = transactionProducer.getProducer();//初始化事務(wù)producer.initTransactions();boolean flag = true;//循環(huán)四次,最后一次我們把事務(wù)成功提交//理想結(jié)果:前三次事務(wù)提交失敗// 事務(wù)消費(fèi)者消費(fèi)不到數(shù)據(jù)1,2,第四次可以消費(fèi)到1,2,3,4;// 普通消費(fèi)者可以消費(fèi)到前三次的1,2 ,也可以消費(fèi)到第四次1,2,3,4// 運(yùn)行方法 TransactionConsumer/*** 結(jié)果如下,事務(wù)提交成功* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->1 partition:2 offset:3080713* 事務(wù)消費(fèi)者消費(fèi)數(shù)據(jù)->3 partition:2 offset:3080717* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->2 partition:1 offset:3081410* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->1 partition:3 offset:3081465* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->1 partition:2 offset:3080715* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->3 partition:2 offset:3080717* 事務(wù)消費(fèi)者消費(fèi)數(shù)據(jù)->4 partition:1 offset:3081414* 事務(wù)消費(fèi)者消費(fèi)數(shù)據(jù)->2 partition:0 offset:3081470* 事務(wù)消費(fèi)者消費(fèi)數(shù)據(jù)->1 partition:3 offset:3081467* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->2 partition:1 offset:3081412* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->4 partition:1 offset:3081414* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->2 partition:0 offset:3081468* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->2 partition:0 offset:3081470* 普通消費(fèi)者消費(fèi)數(shù)據(jù)->1 partition:3 offset:3081467*/for(int i=0;i<=3;i++) {if(i==3)flag = false;try {//事務(wù)開始producer.beginTransaction();producer.send(new ProducerRecord<String, String>("test", "1")).get();producer.send(new ProducerRecord<String, String>("test", "2")).get();//手動(dòng)制造異常if (flag)throw new RuntimeException("程序異常");producer.send(new ProducerRecord<String, String>("test", "3")).get();producer.send(new ProducerRecord<String, String>("test", "4")).get();//事務(wù)提交producer.commitTransaction();} catch (Exception e) {//中止事務(wù)producer.abortTransaction();e.printStackTrace();}}} }Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.StringDeserializer; import wiki.hadoop.kafka.config.Constant; import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** 消費(fèi)Kafka,保證事務(wù)性* @author jast* @date 2020/4/21 22:54*/ public class TransactionConsumer {/*** 事務(wù)性kafka消費(fèi)* @return KafkaConsumer<String,String>* @param topic* @param max_poll_records* @param group* @return*/public KafkaConsumer<String, String> transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();//-----------------------------------------------------------------------------------//設(shè)置只讀事務(wù)提交成功后的數(shù)據(jù)props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的數(shù)量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動(dòng)提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumer<String, String> consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的數(shù)量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自動(dòng)提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer = new TransactionConsumer();TransactionConsumer transactionConsumer2 = new TransactionConsumer();KafkaConsumer<String, String> consumer = transactionConsumer.consumer("test", "test", 10, false);KafkaConsumer<String, String> consumer2 = transactionConsumer2.transactionConsumer("test", "test2", 10, false);CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.println("普通消費(fèi)者消費(fèi)數(shù)據(jù)->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());} // System.out.println("普通消費(fèi)者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records2 = consumer2.poll(1000);for (ConsumerRecord<String, String> record : records2) {System.out.println("事務(wù)消費(fèi)者消費(fèi)數(shù)據(jù)->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());} // System.out.println("事務(wù)消費(fèi)者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();} }?
?
?
總結(jié)
以上是生活随笔為你收集整理的Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Hbase Memstore刷新方式与R
- 下一篇: libcurl使用方法