1.說明
Spring可以方便的集成使用 Kafka消息隊列 ,
只需要引入依賴包spring-kafka,
注意版本兼容問題,
本文詳細介紹SpringBoot集成Kafka的方法,
以及生產者和消費者的使用方法。
2.引入依賴
在 pom.xml 中引入Spring Kafka版本:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
具體的版本號建議通過spring-boot-dependencies管理:
<properties><spring-boot.version>2.3.1.RELEASE</spring-boot.version>
</properties>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
3.配置
新建applicaion.yml,
新增如下配置:
Spring Kafka 通用配置
spring:kafka:# kafka連接接地址bootstrap-servers: localhost:9092
通用配置:spring.kafka.*
下面的admin、producer、consumer、streams配置,
會覆蓋通用配置 spring.kafka.* 中相同的屬性。
生產者配置
spring:kafka:# 生產者配置producer:# 序列化key的類key-serializer: org.apache.kafka.common.serialization.StringSerializer # 反序列化value的類value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
生產者相關配置:spring.kafka.producer.*
消費者配置
spring:kafka:# 消費者配置 consumer:# 消費者所屬消息組group-id: testGroup # 反序列化key的類key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化value的類 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
消費者相關配置:spring.kafka.consumer.*
默認 value-deserializer 使用 org.apache.kafka.common.serialization.StringDeserializer ,
只支持文本消息。
使用org.springframework.kafka.support.serializer.JsonDeserializer可以讓消息支持JSON。
完整applicaion.yml配置:
server:port: 8028 spring:kafka:# kafka連接接地址bootstrap-servers: localhost:9092# 生產者配置producer:# 序列化key的類key-serializer: org.apache.kafka.common.serialization.StringSerializer # 反序列化value的類value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消費者配置 consumer:# 消費者所屬消息組group-id: testGroup # 反序列化key的類key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 反序列化value的類 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
4.開發代碼
新建KafkaMQApplication.java啟動類,
注意要新增 @EnableKafka 注解:
package com.yuwen.spring.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
@EnableKafka
public class KafkaMQApplication {public static void main(String[] args) {SpringApplication.run(KafkaMQApplication.class, args);}
}
生產者發送消息
Spring Kafka 提供KafkaTemplate類發送消息,
在需要的地方注入即可,
新增ProviderService.java生產者服務類:
package com.yuwen.spring.kafka.provider;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class ProviderService {public static final String TOPIC = "testTopic";@Autowiredprivate KafkaTemplate<?, String> kafkaTemplate;public void send(String message) {// 發送消息kafkaTemplate.send(TOPIC, message);System.out.println("Provider= " + message);}
}
注意指定 topic ,
以及要發送的消息內容message。
消費者接收消息
新增ConsumerService.java類,
注意使用 @KafkaListener 注解:
package com.yuwen.spring.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;import com.yuwen.spring.kafka.provider.ProviderService;@Service
public class ConsumerService {@KafkaListener(topics = ProviderService.TOPIC, groupId = "testGroup", topicPartitions = {})public void receive(String message) {System.out.println("Consumer= " + message);}
}
參數說明:topics 與發送消息topic相同,可以指定多個groupId 消費組唯一idtopicPartitions topic分區,可指定多個
5.自動產生消息
為了測試生產者產生消息,
編寫AutoGenerate.java,
自動生成隨機字符串,
作為生產者向kafka發送消息:
package com.yuwen.spring.kafka.provider;import java.util.UUID;
import java.util.concurrent.TimeUnit;import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class AutoGenerate implements InitializingBean {@Autowiredprivate ProviderService providerService;@Overridepublic void afterPropertiesSet() throws Exception {Thread t = new Thread(new Runnable() {@Overridepublic void run() {while (true) {String message = UUID.randomUUID().toString();providerService.send(message);try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}});t.start();}
}
6.運行服務
運行KafkaMQApplication.java啟動類,
輸出如下日志,
可以看到生產者產生的隨機字符串,
能夠被消費者正確獲取到:
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v2.3.1.RELEASE)2022-04-28 19:37:49.687 INFO 14424 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Starting KafkaMQApplication on yuwen-asiainfo with PID 14424 (D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka\target\classes started by yuwen in D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka)
2022-04-28 19:37:49.689 INFO 14424 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : No active profile set, falling back to default profiles: default
2022-04-28 19:37:51.282 INFO 14424 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8028 (http)
2022-04-28 19:37:51.290 INFO 14424 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-04-28 19:37:51.291 INFO 14424 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.36]
2022-04-28 19:37:51.371 INFO 14424 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-04-28 19:37:51.371 INFO 14424 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1645 ms
2022-04-28 19:37:51.491 INFO 14424 --- [ Thread-119] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [10.21.13.14:9092]buffer.memory = 33554432client.dns.lookup = defaultclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 2147483647retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.springframework.kafka.support.serializer.JsonSerializer2022-04-28 19:37:51.563 INFO 14424 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-04-28 19:37:51.590 INFO 14424 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2022-04-28 19:37:51.592 INFO 14424 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2022-04-28 19:37:51.592 INFO 14424 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651145871589
2022-04-28 19:37:51.851 INFO 14424 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: allow.auto.create.topics = trueauto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [10.21.13.14:9092]check.crcs = trueclient.dns.lookup = defaultclient.id = client.rack = connections.max.idle.ms = 540000default.api.timeout.ms = 60000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = testGroupgroup.instance.id = nullheartbeat.interval.ms = 3000interceptor.classes = []internal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer2022-04-28 19:37:51.888 INFO 14424 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2022-04-28 19:37:51.888 INFO 14424 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2022-04-28 19:37:51.888 INFO 14424 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651145871888
2022-04-28 19:37:51.890 INFO 14424 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Subscribed to topic(s): testTopic
2022-04-28 19:37:51.892 INFO 14424 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2022-04-28 19:37:51.911 INFO 14424 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8028 (http) with context path ''
2022-04-28 19:37:51.921 INFO 14424 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Started KafkaMQApplication in 2.582 seconds (JVM running for 2.957)
2022-04-28 19:37:51.939 INFO 14424 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-04-28 19:37:51.939 INFO 14424 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-04-28 19:37:51.940 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Discovered group coordinator 10.21.13.14:9092 (id: 2147483647 rack: null)
2022-04-28 19:37:51.942 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] (Re-)joining group
2022-04-28 19:37:51.959 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Finished assignment for group at generation 5: {consumer-testGroup-1-35e34543-5bf3-4a4a-a590-9a4c6f7e1ae3=Assignment(partitions=[testTopic-0])}
Provider= c30a2e6c-e2e8-419e-865c-04885d1a90b5
2022-04-28 19:37:51.966 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Successfully joined group with generation 5
2022-04-28 19:37:51.970 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Adding newly assigned partitions: testTopic-0
2022-04-28 19:37:51.984 INFO 14424 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Setting offset for partition testTopic-0 to the committed offset FetchPosition{offset=310751, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.21.13.14:9092 (id: 0 rack: null)], epoch=absent}}
2022-04-28 19:37:51.985 INFO 14424 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroup: partitions assigned: [testTopic-0]
Consumer= 19210493-d0df-4cd9-993b-f99500523eb2
Consumer= baee3749-307f-4894-ad88-a9610700ab80
Consumer= bea5e807-b003-4c90-89be-20439e2fa921
Consumer= 98258208-8a95-495d-917e-84d30d965e2b
Consumer= 4301851e-ab19-4c9e-89d6-7b604acdf077
Consumer= c30a2e6c-e2e8-419e-865c-04885d1a90b5
Provider= a6d47e9e-de74-481f-82f8-02bd7384fdd8
Consumer= a6d47e9e-de74-481f-82f8-02bd7384fdd8
Provider= bd935ef1-cc61-4014-a971-1ad76c5e82bf
Consumer= bd935ef1-cc61-4014-a971-1ad76c5e82bf
7.參考文章
Spring Boot集成Kafka
總結
以上是生活随笔為你收集整理的SpringBoot集成Kafka消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。