javascript
Spring Apache Kafka教程
在本SpringApache Kafka課程中,我們將學(xué)習(xí)如何在Spring Boot項(xiàng)目中開始使用Apache Kafka,并開始生成和使用我們所選主題的消息。 除了一個(gè)簡(jiǎn)單的項(xiàng)目外,我們還將深入探討Kafka的術(shù)語以及分區(qū)概念在Kafka中的工作方式。 讓我們開始吧。
1.簡(jiǎn)介
隨著的起義微服務(wù) ,涉及的服務(wù)之間的異步通信的必要性成為主流需求。 實(shí)際上,這就是Apache Kafka在LinkedIn上出現(xiàn)的方式 。 他們需要的新異步通信系統(tǒng)的主要要求是消息持久性和高吞吐量 。 一旦LinkedIn能夠面對(duì)Kafka進(jìn)行項(xiàng)目開發(fā),他們便將該項(xiàng)目捐贈(zèng)給Apache Software Foundation,該基金會(huì)后來被稱為Apache Kafka 。
目錄
1.簡(jiǎn)介 2.什么是Apache Kafka? 3. Apache Kafka術(shù)語 4.安裝Kafka并制作主題 5.使用Maven制作Spring Boot項(xiàng)目 6.添加Maven依賴項(xiàng) 7.項(xiàng)目結(jié)構(gòu) 8.添加配置 9.定義生產(chǎn)者配置 10.定義使用者配置 11.定義Spring Boot類 12.運(yùn)行項(xiàng)目 13. Kafka分區(qū) 14.結(jié)論 15.下載源代碼2.什么是Apache Kafka?
Kafka于2010年在LinkedIn上開發(fā),并捐贈(zèng)給Apache Software Foundation,該基金會(huì)到2012年成為頂級(jí)項(xiàng)目之一。Apache Kafka在其生態(tài)系統(tǒng)中具有三個(gè)主要組成部分:
對(duì)于高級(jí)定義,我們可以為Apache Kafka提供一個(gè)簡(jiǎn)單的定義:
Apache Kafka是一個(gè)分布式的,可容錯(cuò)的,可水平擴(kuò)展的提交日志。
讓我們?cè)敿?xì)說明一下:
- 分布式 :Kafka是一個(gè)分布式系統(tǒng),其中所有消息都在各個(gè)節(jié)點(diǎn)上復(fù)制,因此每個(gè)服務(wù)器都能夠響應(yīng)客戶端包含的消息。 此外,即使一個(gè)節(jié)點(diǎn)發(fā)生故障,其他節(jié)點(diǎn)也可以快速接管而無需停機(jī)
- 容錯(cuò) :由于Kafka沒有單點(diǎn)故障 ,即使其中一個(gè)節(jié)點(diǎn)發(fā)生故障,最終用戶也幾乎不會(huì)注意到這一點(diǎn),因?yàn)槠渌糠謱?duì)由于故障節(jié)點(diǎn)而丟失的消息負(fù)責(zé)
- 可水平擴(kuò)展 :Kafka允許我們以零停機(jī)時(shí)間向群集添加更多計(jì)算機(jī)。 這意味著,如果由于集群中服務(wù)器數(shù)量少而開始出現(xiàn)消息滯后的情況,我們可以快速添加更多服務(wù)器并保持系統(tǒng)性能
- 提交日志 :提交日志是指類似于鏈接列表的結(jié)構(gòu)。 消息的插入順序得以維持,并且直到達(dá)到閾值時(shí)間后,才能從此日志中刪除數(shù)據(jù)
在接下來的部分中,我們將討論關(guān)于Apache Kafka的基本術(shù)語的更多概念將變得更加清晰。
3. Apache Kafka術(shù)語
在繼續(xù)進(jìn)行Kafka概念和示例項(xiàng)目之前,我們必須了解與Apache Kafka相關(guān)的基本術(shù)語。 其中一些是:
- 生產(chǎn)者 :此組件將消息發(fā)布到Kafka集群
- 使用者 :此組件使用來自Kafka集群的消息
- 消息 :這是生產(chǎn)者發(fā)送到集群的數(shù)據(jù)
- 連接 :生產(chǎn)者需要建立一個(gè)TCP連接以發(fā)布消息。 消費(fèi)者應(yīng)用程序也需要同樣的條件來消耗Kafka集群中的數(shù)據(jù)
- 主題 :主題是類似消息的邏輯分組。 生產(chǎn)者應(yīng)用可以將消息發(fā)布到特定主題,并且可以從特定主題中消費(fèi)
- 主題分區(qū) :為了擴(kuò)展主題內(nèi)存,因?yàn)樗梢园芏嘞?#xff0c;因此將一個(gè)主題劃分為多個(gè)分區(qū),每個(gè)分區(qū)可以位于群集中的任何節(jié)點(diǎn)上,下圖顯示了如何將消息寫入多個(gè)分區(qū):
卡夫卡中的主題劃分
- 副本 :如上圖所示,有關(guān)主題分區(qū),每個(gè)消息都在各個(gè)節(jié)點(diǎn)之間復(fù)制,以維持順序并防止其中一個(gè)節(jié)點(diǎn)死亡時(shí)數(shù)據(jù)丟失
- 消費(fèi)者組 :可以將對(duì)一個(gè)主題感興趣的多個(gè)消費(fèi)者歸為一組,稱為“消費(fèi)者組”。
- 偏移量 :Kafka不存儲(chǔ)有關(guān)哪個(gè)消費(fèi)者將要讀取哪些數(shù)據(jù)的信息。 每個(gè)使用者都保留有關(guān)他們最后閱讀的消息是什么的偏移值。 這意味著不同的消費(fèi)者可以同時(shí)閱讀不同的消息
- 節(jié)點(diǎn) :節(jié)點(diǎn)只是集群中的單個(gè)服務(wù)器。 我們可以選擇在集群中添加幾乎任何數(shù)量的節(jié)點(diǎn)
- 群集 :一組節(jié)點(diǎn)稱為群集。
4.安裝Kafka并制作主題
要下載并安裝Kafka,我們可以參考此處提供的Kafka官方指南。 當(dāng)Kafka服務(wù)器啟動(dòng)并運(yùn)行時(shí),我們可以使用以下命令創(chuàng)建一個(gè)名為javacodegeeks的新主題:
創(chuàng)建一個(gè)話題
bin/kafka-topics --create \--zookeeper localhost:2181 \--replication-factor 1 --partitions 1 \--topic javacodegeeks一旦執(zhí)行此命令,我們將看到以下輸出:
Kafka創(chuàng)建主題
5.使用Maven制作Spring Boot項(xiàng)目
我們將使用許多Maven原型之一為我們的示例創(chuàng)建一個(gè)示例項(xiàng)目。 要?jiǎng)?chuàng)建項(xiàng)目,請(qǐng)?jiān)趯⒂米鞴ぷ骺臻g的目錄中執(zhí)行以下命令:
創(chuàng)建一個(gè)項(xiàng)目
mvn archetype:generate -DgroupId=com.javacodegeeks.example -DartifactId=JCG-BootKafka-Example -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false如果您是第一次運(yùn)行maven,則完成生成命令將花費(fèi)幾秒鐘,因?yàn)閙aven必須下載所有必需的插件和工件才能完成生成任務(wù)。 運(yùn)行該項(xiàng)目后,我們將看到以下輸出并創(chuàng)建該項(xiàng)目:
創(chuàng)建Kafka項(xiàng)目
6.添加Maven依賴項(xiàng)
創(chuàng)建項(xiàng)目后,請(qǐng)隨時(shí)在您喜歡的IDE中打開它。 下一步是向項(xiàng)目添加適當(dāng)?shù)腗aven依賴關(guān)系。 我們將在項(xiàng)目中使用以下依賴項(xiàng):
- spring-boot-starter-web :此依賴關(guān)系將該項(xiàng)目標(biāo)記為Web項(xiàng)目,并且添加了依賴關(guān)系以創(chuàng)建控制器并創(chuàng)建與Web相關(guān)的類。
- spring-kafka :這是將所有與Kafka相關(guān)的依賴項(xiàng)引入項(xiàng)目類路徑的依賴項(xiàng)
- spring-boot-starter-test :此依賴項(xiàng)將所有與測(cè)試相關(guān)的JAR收集到項(xiàng)目中,例如JUnit和Mockito 。
這是pom.xml文件,其中添加了適當(dāng)?shù)囊蕾図?xiàng):
pom.xml
<groupId>com.javacodegeeks.example</groupId> <artifactId>JCG-BootKafka-Example</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>JCG-BootKafka-Example</name><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.10.RELEASE</version><relativePath/> <!-- lookup parent from repository --> </parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version> </properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.1.3.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins> </build>在Maven Central上找到最新的Maven依賴項(xiàng)。
最后,要了解添加此依賴項(xiàng)時(shí)添加到項(xiàng)目中的所有JAR,我們可以運(yùn)行一個(gè)簡(jiǎn)單的Maven命令,當(dāng)我們向其添加一些依賴項(xiàng)時(shí),該命令使我們能夠查看項(xiàng)目的完整依賴關(guān)系樹。 這是我們可以使用的命令:
檢查依賴樹
mvn dependency:tree當(dāng)我們運(yùn)行此命令時(shí),它將向我們顯示以下依賴關(guān)系樹:
檢查依賴
注意到了什么? 通過向項(xiàng)目中添加四個(gè)依賴項(xiàng),添加了如此多的依賴項(xiàng)。 Spring Boot本身會(huì)收集所有相關(guān)的依賴項(xiàng),因此在此方面不做任何事情。 最大的優(yōu)勢(shì)在于, 所有這些依賴項(xiàng)都保證相互兼容 。
7.項(xiàng)目結(jié)構(gòu)
在繼續(xù)進(jìn)行并開始處理項(xiàng)目代碼之前,讓我們?cè)谶@里介紹完成所有代碼添加到項(xiàng)目后將擁有的項(xiàng)目結(jié)構(gòu):
項(xiàng)目結(jié)構(gòu)
我們將項(xiàng)目分為多個(gè)包,以便遵循關(guān)注點(diǎn)分離的原則,并且代碼保持模塊化。
8.添加配置
在開始為項(xiàng)目編寫代碼之前,我們需要在Spring Boot項(xiàng)目的application.properties文件中提供一些屬性:
application.properties
#Kafka Topic message.topic.name=javacodegeeksspring.kafka.bootstrap-servers=localhost:9092#Unique String which identifies which consumer group this consumer belongs to spring.kafka.consumer.group-id=jcg-group這些是我們將在項(xiàng)目中使用的一些屬性,這些屬性將用作我們將發(fā)布和使用的消息的主題和組ID。 另外, 9092是Apache Kafka的默認(rèn)端口。 請(qǐng)注意,我們可以在此處定義多個(gè)主題,并為鍵指定不同的名稱。
9.定義生產(chǎn)者配置
我們將從定義生產(chǎn)者的配置開始。 我們需要為Kafka Producer強(qiáng)制定義的唯一屬性是帶有端口的Kafka服務(wù)器的地址。
KafkaProducerConfig.java
package com.javacodegeeks.example.config;import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap; import java.util.Map;@Configuration public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapAddress;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());} }盡管上面的類定義非常簡(jiǎn)單,但是我們?nèi)匀恍枰私庖恍┮c(diǎn):
- @Configuration :此類定義為配置類,這意味著該類將由Spring Boot自動(dòng)選擇,并且定義在該類內(nèi)的所有bean將由Spring容器自動(dòng)管理。
- 我們?yōu)镻roducerFactory定義了一個(gè)bean,該bean接受輸入作為各種屬性,例如Kafka服務(wù)器地址和其他序列化屬性,這些屬性有助于對(duì)通過Kafka生產(chǎn)者bean發(fā)送的消息進(jìn)行編碼和解碼。
- 最后,我們?yōu)镵afkaTemplate定義了一個(gè)bean,它是實(shí)際的API對(duì)象,將用于在Kafka主題上發(fā)布消息。
10.定義使用者配置
當(dāng)我們出于演示目的而在同一應(yīng)用中制作Kafka生產(chǎn)者和消費(fèi)者時(shí),我們還將定義一個(gè)消費(fèi)者配置類,該類僅包含Kafka消費(fèi)者的基本屬性。 可以將此類放在任何既不是生產(chǎn)者又僅僅是Kafka消費(fèi)者的項(xiàng)目中,而無需進(jìn)行任何更改。 讓我們看一下配置定義:
KafkaConsumerConfig.java
package com.javacodegeeks.example.config;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap; import java.util.Map;@EnableKafka @Configuration public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapAddress;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory= new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;} }我們提供的配置與生產(chǎn)者配置非常相似。 這里唯一要注意的區(qū)別是:
- 我們定義了一個(gè)ConsumerFactory類對(duì)象bean,該對(duì)象bean同時(shí)考慮了該Kafka消費(fèi)者應(yīng)用程序所屬的Kafka服務(wù)器地址和消費(fèi)者組ID。 我們已經(jīng)為消費(fèi)者提供了唯一的字符串,因?yàn)橹挥形ㄒ坏淖址强梢越邮艿?
- 最后,我們定義了ConcurrentKafkaListenerContainerFactory ,以確保此使用者應(yīng)用程序可以并發(fā)的速度使用Kafka消息,并且即使已發(fā)布的消息數(shù)量很高,也可以提供一致的吞吐量。
11.定義Spring Boot類
在最后階段,我們將使Spring Boot類用于發(fā)布消息,并在同一主題上使用消息。 這是主類的類定義:
KafkaApp.java
package com.javacodegeeks.example;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication public class KafkaApp implements CommandLineRunner {private static final Logger LOG = LoggerFactory.getLogger("KafkaApp");@Value("${message.topic.name}")private String topicName;private final KafkaTemplate<String, String> kafkaTemplate;@Autowiredpublic KafkaApp(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public static void main(String[] args) {SpringApplication.run(KafkaApp.class, args);}@Overridepublic void run(String... strings) {kafkaTemplate.send(topicName, "Hello Geek!");LOG.info("Published message to topic: {}.", topicName);}@KafkaListener(topics = "javacodegeeks", group = "jcg-group")public void listen(String message) {LOG.info("Received message in JCG group: {}", message);}}我們使用了CommandLineRunner接口來使此類運(yùn)行代碼,從而可以測(cè)試所編寫的生產(chǎn)者和配置類代碼。 在此類中,我們將消息發(fā)布到指定的主題,并在同一類中定義的偵聽器方法中偵聽該消息。
在下一節(jié)中,我們將使用簡(jiǎn)單的Maven命令運(yùn)行項(xiàng)目。
12.運(yùn)行項(xiàng)目
既然完成了主類定義,我們就可以運(yùn)行我們的項(xiàng)目。 使用maven可以輕松運(yùn)行應(yīng)用程序,只需使用以下命令:
運(yùn)行項(xiàng)目
mvn spring-boot:run一旦執(zhí)行了上面的命令,我們將看到一條消息已在指定主題上發(fā)布,并且同一應(yīng)用程序也使用了該消息:
運(yùn)行Spring Boot Kafka項(xiàng)目
13. Kafka分區(qū)
作為最后一個(gè)概念,我們將介紹如何在Apache Kafka中完成主題分區(qū)。 我們將從一個(gè)非常簡(jiǎn)單的說明性圖像開始,該圖像顯示領(lǐng)導(dǎo)者在主題分區(qū)中的存在方式:
主題劃分
當(dāng)在Broker 0中的Partition 0是領(lǐng)導(dǎo)者的位置上寫主題時(shí),此數(shù)據(jù)將在節(jié)點(diǎn)之間復(fù)制,從而使消息保持安全。 這意味著將為上圖中顯示的所有三個(gè)代理在分區(qū)0之間復(fù)制消息。
Kafka中的復(fù)制過程是通過節(jié)點(diǎn)打開的多個(gè)線程并行完成的。 隨著線程的開放以盡可能多地利用并行性,Kafka中獲得了非常高的吞吐量系統(tǒng)。 將消息復(fù)制一定次數(shù)后, 將調(diào)用寫入操作,但是消息的復(fù)制將繼續(xù)進(jìn)行,直到達(dá)到數(shù)據(jù)的復(fù)制因子為止。
14.結(jié)論
在本課程中,我們研究了構(gòu)建集成有Apache Kafka的Spring Boot應(yīng)用是多么容易和快捷。 Apache Kafka已從一個(gè)簡(jiǎn)單的Apache項(xiàng)目發(fā)展到一個(gè)生產(chǎn)級(jí)項(xiàng)目,當(dāng)在其群集中以正確數(shù)量的節(jié)點(diǎn)進(jìn)行部署,分配適當(dāng)?shù)膬?nèi)存并通過Zookeeper進(jìn)行正確管理時(shí),它每秒可以管理數(shù)百萬個(gè)請(qǐng)求。 Apache Kafka是軟件工程師工作中最有前途的技能之一,可以涵蓋許多用例,例如網(wǎng)站跟蹤,實(shí)時(shí)消息傳遞應(yīng)用程序等等。
Apache Kafka可以管理有關(guān)其主題和分區(qū)的消息的規(guī)模確實(shí)很小,而使其具有如此高的可擴(kuò)展性所需的體系結(jié)構(gòu)路線也啟發(fā)了許多其他項(xiàng)目。 它承諾提供的可擴(kuò)展性和實(shí)時(shí)處理速度可確保它解決了需要非常擴(kuò)展的項(xiàng)目中的許多問題。
15.下載源代碼
這是Apache Kafka與Spring Framework集成的示例。
下載您可以在此處下載此示例的完整源代碼: JCG-BootKafka-Example
翻譯自: https://www.javacodegeeks.com/2018/05/spring-apache-kafka-tutorial.html
總結(jié)
以上是生活随笔為你收集整理的Spring Apache Kafka教程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何用EXCEl表格快速制作电子印章ex
- 下一篇: adf4351使用方法_ADF:使用HT