Apache Kafka-事务消息的支持与实现(本地事务)
文章目錄
- 概述
- 官方示例
- Code (原生API)
- Code (Spring Kafka)
- POM依賴
- 配置文件
- 生產者
- 注意事項
- 消費者
- 單元測試
- 測試結果
- 源碼地址
概述
Kafka的事務不同于Rocketmq,Rocketmq是保障本地事務(比如數據庫)與mq消息發送的事務一致性,Kafka的事務主要是保障一次發送多條消息的事務一致性(要么同時成功要么同時失敗)。
一般在kafka的流式計算場景用得多一點,比如,kafka需要對一個topic里的消息做不同的流式計算處理,處理完分別發到不同的topic里,這些topic分別被不同的下游系統消費(比如hbase,redis,es等),這種我們肯定希望系統發送到多個topic的數據保持事務一致性。
Kafka要實現類似Rocketmq的分布式事務需要額外開發功能。
官方文檔: http://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
這個功能比較雞肋,大家看著用哈 ,它保證不了不同介質的數據一致性。
官方示例
From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer.
原生的API操作,請查看文檔,這里我們來看下使用Spring kafka如何實現事務消息。
Code (原生API)
@Test public void testT(){ // 正常的 Properties props = new Properties();props.put("bootstrap.servers", "192.168.126.140:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try {producer.beginTransaction();for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();}producer.close();}@Testpublic void testT2(){ // 測試異常情況 Properties props = new Properties();props.put("bootstrap.servers", "192.168.126.140:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());producer.initTransactions();try {producer.beginTransaction();for (int i = 0; i < 100; i++){producer.send(new ProducerRecord<>("my-topic2", Integer.toString(i), Integer.toString(i)));if (i == 50) {throw new RuntimeException("MOCK Exception");}}producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.producer.abortTransaction();}producer.close();}
看看數據
可以看到 入了一部分,只是這里進入的數據也無法被消費。
Code (Spring Kafka)
POM依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入 Spring-Kafka 依賴 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies>配置文件
spring:# Kafka 配置項,對應 KafkaProperties 配置類kafka:bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以設置多個,以逗號分隔# Kafka Producer 配置項producer:acks: all # 0-不應答。1-leader 應答。all-所有 leader 和 follower 應答。retries: 3 # 發送失敗時,重試發送的次數key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化transaction-id-prefix: artisan. # 事務編號前綴# Kafka Consumer 配置項consumer:auto-offset-reset: earliest # 設置消費者分組最初的消費進度為 earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring:json:trusted:packages: com.artisan.springkafka.domainisolation-level: read_committed # 讀取已提交的消息# Kafka Consumer Listener 監聽器配置listener:missing-topics-fatal: false # 消費監聽接口監聽的主題不存在時,默認會報錯。所以通過設置為 false ,解決報錯logging:level:org:springframework:kafka: ERROR # spring-kafkaapache:kafka: ERROR # kafka-
spring.kafka.producer.acks 配置為all,Kafka 的事務消息需要基于冪等性來實現,必須保證所有節點都寫入成功,否則的話啟動時會拋出Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence
-
事務編號前綴屬性設置 transaction-id-prefix, 需要保證相同應用配置相同,不同應用配置不同。 How to choose Kafka transaction id for several applications, hosted in Kubernetes?
-
spring.kafka.consumer.properties.isolation.level 設置為 read_committed ,Consumer 僅讀取已提交的消息, 否則不生效
生產者
package com.artisan.springkafka.producer;import com.artisan.springkafka.constants.TOPIC; import com.artisan.springkafka.domain.MessageMock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaOperations; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component;import java.util.Random; import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:25* @mark: show me the code , change the world*/@Component public class ArtisanProducerMock {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate ;public String testTransaction(Runnable runnable){return kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<Object, Object, String>() {@Overridepublic String doInOperations(KafkaOperations<Object, Object> operations) throws RuntimeException {for (int i = 1; i <= 10; i++) {// 用于測試 消息是否在同一個事務中if (i == 7 ) {throw new RuntimeException("MOCK ERROR , TEST Tranasction");}// 模擬發送的消息Integer id = new Random().nextInt(100);MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);SendResult<Object, Object> sendResult = null;try {sendResult = operations.send(TOPIC.TOPIC, messageMock).get();} catch ( Exception e) {logger.error("Error {}", e);}logger.info( i+ "-[doInOperations][發送數據:[{}] 發送結果:[{}]]", messageMock, sendResult);// 本地業務邏輯...runnable.run();}// 返回結果return "OJ8K";}});}}我們模擬發送10條消息,第7條的時候拋出異常,觀察消費者是否能消費前面已經發送的6條 ,如果能消費,那肯定不符合和預期。 因為Kafka的事務主要是保障一次發送多條消息的事務一致性(要么同時成功要么同時失敗)。
調用 kafkaTemplate#executeInTransaction(OperationsCallback<K, V, T> callback) 模板方法,實現在 Kafka 事務中,執行自定義 KafkaOperations.OperationsCallback 操作。
-
executeInTransaction(...) 方法中,可以通過 KafkaOperations 來執行發送消息等 Kafka 相關的操作,當然了也可以執行自己的業務邏輯,比如 runnable參數,用于表示本地業務邏輯
-
executeInTransaction(...) 方法的開始,會自動動創建 Kafka 的事務,然后執行KafkaOperations 的邏輯。成功,則提交 Kafka 事務;失敗,則回滾 Kafka 事務。
注意事項
如果 Kafka Producer 開啟了事務的功能,則所有發送的消息,都必須處于 Kafka 事務之中,否則會拋出 No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
假設業務中,即存在需要事務的情況,也存在不需要事務的情況,那么則需要分別定義兩個 KafkaTemplate(Kafka Producer)
消費者
package com.artisan.springkafka.consumer;import com.artisan.springkafka.constants.TOPIC; import com.artisan.springkafka.domain.MessageMock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:33* @mark: show me the code , change the world*/@Component public class ArtisanCosumerMock {private Logger logger = LoggerFactory.getLogger(getClass());private static final String CONSUMER_GROUP_PREFIX = "MANUAL_ACK_" ;@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)public void onMessage(MessageMock messageMock) {logger.info("【接受到消息][線程:{} 消息內容:{}]", Thread.currentThread().getName(), messageMock);}}單元測試
package com.artisan.springkafka.produceTest;import com.artisan.springkafka.SpringkafkaApplication; import com.artisan.springkafka.producer.ArtisanProducerMock; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.support.SendResult; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;/*** @author 小工匠* * @version 1.0* @description: TODO* @date 2021/2/17 22:40* @mark: show me the code , change the world*/@RunWith(SpringRunner.class) @SpringBootTest(classes = SpringkafkaApplication.class) public class ProduceMockTest {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate ArtisanProducerMock artisanProducerMock;@AutowiredArtisanProducerMock producerMock;@Testpublic void testAsynSend() throws ExecutionException, InterruptedException {logger.info("開始發送");producerMock.testTransaction(() -> {logger.info(" mock doing bussiness ");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}logger.info("bussiness over ");});// 阻塞等待,保證消費new CountDownLatch(1).await();}}測試結果
.... .... ..... ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v2.4.1)2021-02-20 01:35:44.452 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : Starting ProduceMockTest using Java 1.8.0_261 on LAPTOP-JF3RBRRJ with PID 12108 (started by artisan in D:\IdeaProjects\boot2\springkafkaTransaction) 2021-02-20 01:35:44.456 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : No active profile set, falling back to default profiles: default 2021-02-20 01:35:45.832 INFO 12108 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor' 2021-02-20 01:35:46.811 INFO 12108 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 2021-02-20 01:35:46.827 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : Started ProduceMockTest in 2.77 seconds (JVM running for 3.55) 2021-02-20 01:35:47.021 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : 開始發送 2021-02-20 01:35:47.298 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 1-[doInOperations][發送數據:[MessageMock{id=2, name='messageSendByAsync-2'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@16]]] 2021-02-20 01:35:47.298 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:48.302 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:48.305 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 2-[doInOperations][發送數據:[MessageMock{id=2, name='messageSendByAsync-2'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=AC-0@17]]] 2021-02-20 01:35:48.305 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:49.308 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:49.308 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 3-[doInOperations][發送數據:[MessageMock{id=36, name='messageSendByAsync-36'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=36, name='messageSendByAsync-36'}, timestamp=null), recordMetadata=AC-0@18]]] 2021-02-20 01:35:49.308 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:50.314 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:50.314 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 4-[doInOperations][發送數據:[MessageMock{id=19, name='messageSendByAsync-19'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=19, name='messageSendByAsync-19'}, timestamp=null), recordMetadata=AC-0@19]]] 2021-02-20 01:35:50.318 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:51.321 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:51.321 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 5-[doInOperations][發送數據:[MessageMock{id=29, name='messageSendByAsync-29'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=AC-0@20]]] 2021-02-20 01:35:51.325 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:52.326 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 01:35:52.326 INFO 12108 --- [ main] c.a.s.producer.ArtisanProducerMock : 6-[doInOperations][發送數據:[MessageMock{id=45, name='messageSendByAsync-45'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=AC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=45, name='messageSendByAsync-45'}, timestamp=null), recordMetadata=AC-0@21]]] 2021-02-20 01:35:52.326 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 01:35:53.326 INFO 12108 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over java.lang.RuntimeException: MOCK ERROR , TEST Tranasctionat com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:42)at com.artisan.springkafka.producer.ArtisanProducerMock$1.doInOperations(ArtisanProducerMock.java:34)at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:467)at com.artisan.springkafka.producer.ArtisanProducerMock.testTransaction(ArtisanProducerMock.java:34)at com.artisan.springkafka.produceTest.ProduceMockTest.testAsynSend(ProduceMockTest.java:45)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)at org.junit.runners.ParentRunner.run(ParentRunner.java:413)at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)at org.junit.runner.JUnitCore.run(JUnitCore.java:137)at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)2021-02-20 01:35:53.346 INFO 12108 --- [ntainer#0-0-C-1] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 2021-02-20 01:35:53.357 INFO 12108 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'Process finished with exit code -1可以看到,有異常了,消費者未的消費到消息。
那我們來個成功的看看嘛
2021-02-20 10:10:46.103 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : 開始發送 2021-02-20 10:10:46.429 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 1-[doInOperations][發送數據:[MessageMock{id=44, name='messageSendByAsync-44'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=44, name='messageSendByAsync-44'}, timestamp=null), recordMetadata=OOO_TOIPC-0@7]]] 2021-02-20 10:10:46.429 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:47.430 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:47.430 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 2-[doInOperations][發送數據:[MessageMock{id=76, name='messageSendByAsync-76'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=76, name='messageSendByAsync-76'}, timestamp=null), recordMetadata=OOO_TOIPC-0@8]]] 2021-02-20 10:10:47.430 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:48.430 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:48.431 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 3-[doInOperations][發送數據:[MessageMock{id=2, name='messageSendByAsync-2'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=2, name='messageSendByAsync-2'}, timestamp=null), recordMetadata=OOO_TOIPC-0@9]]] 2021-02-20 10:10:48.431 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:49.434 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:49.438 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 4-[doInOperations][發送數據:[MessageMock{id=34, name='messageSendByAsync-34'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=34, name='messageSendByAsync-34'}, timestamp=null), recordMetadata=OOO_TOIPC-0@10]]] 2021-02-20 10:10:49.438 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:50.440 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:50.440 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 5-[doInOperations][發送數據:[MessageMock{id=41, name='messageSendByAsync-41'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=41, name='messageSendByAsync-41'}, timestamp=null), recordMetadata=OOO_TOIPC-0@11]]] 2021-02-20 10:10:50.444 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:51.446 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:51.446 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 6-[doInOperations][發送數據:[MessageMock{id=29, name='messageSendByAsync-29'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=29, name='messageSendByAsync-29'}, timestamp=null), recordMetadata=OOO_TOIPC-0@12]]] 2021-02-20 10:10:51.446 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:52.447 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:52.447 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 7-[doInOperations][發送數據:[MessageMock{id=49, name='messageSendByAsync-49'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=49, name='messageSendByAsync-49'}, timestamp=null), recordMetadata=OOO_TOIPC-0@13]]] 2021-02-20 10:10:52.447 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:53.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:53.450 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 8-[doInOperations][發送數據:[MessageMock{id=12, name='messageSendByAsync-12'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=12, name='messageSendByAsync-12'}, timestamp=null), recordMetadata=OOO_TOIPC-0@14]]] 2021-02-20 10:10:53.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:54.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:54.450 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 9-[doInOperations][發送數據:[MessageMock{id=15, name='messageSendByAsync-15'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=15, name='messageSendByAsync-15'}, timestamp=null), recordMetadata=OOO_TOIPC-0@15]]] 2021-02-20 10:10:54.450 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:55.454 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:55.454 INFO 25272 --- [ main] c.a.s.producer.ArtisanProducerMock : 10-[doInOperations][發送數據:[MessageMock{id=25, name='messageSendByAsync-25'}] 發送結果:[SendResult [producerRecord=ProducerRecord(topic=OOO_TOIPC, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 114, 116, 105, 115, 97, 110, 46, 115, 112, 114, 105, 110, 103, 107, 97, 102, 107, 97, 46, 100, 111, 109, 97, 105, 110, 46, 77, 101, 115, 115, 97, 103, 101, 77, 111, 99, 107])], isReadOnly = true), key=null, value=MessageMock{id=25, name='messageSendByAsync-25'}, timestamp=null), recordMetadata=OOO_TOIPC-0@16]]] 2021-02-20 10:10:55.458 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : mock doing bussiness 2021-02-20 10:10:56.460 INFO 25272 --- [ main] c.a.s.produceTest.ProduceMockTest : bussiness over 2021-02-20 10:10:56.625 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=44, name='messageSendByAsync-44'}] 2021-02-20 10:10:56.737 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=76, name='messageSendByAsync-76'}] 2021-02-20 10:10:56.846 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=2, name='messageSendByAsync-2'}] 2021-02-20 10:10:56.962 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=34, name='messageSendByAsync-34'}] 2021-02-20 10:10:56.970 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=41, name='messageSendByAsync-41'}] 2021-02-20 10:10:57.074 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=29, name='messageSendByAsync-29'}] 2021-02-20 10:10:57.082 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=49, name='messageSendByAsync-49'}] 2021-02-20 10:10:57.090 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=12, name='messageSendByAsync-12'}] 2021-02-20 10:10:57.094 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=15, name='messageSendByAsync-15'}] 2021-02-20 10:10:57.101 INFO 25272 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=25, name='messageSendByAsync-25'}]懂了么,老兄 ~
我們繼續看下數據 (新跑的數據,和日志有出入)
源碼地址
https://github.com/yangshangwei/boot2/tree/master/springkafkaTransaction
總結
以上是生活随笔為你收集整理的Apache Kafka-事务消息的支持与实现(本地事务)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解分布式技术 - 消息队列使用场景
- 下一篇: 深入理解分布式技术 - 结合Rocket