javascript
Kafka的Spring Cloud Stream
總覽
該示例項目演示了如何使用事件驅動的體系結構 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok構建實時流應用程序。
在本教程結束時,您將運行一個簡單的基于Spring Boot的Greetings微服務
讓我們開始吧!
順便說一句,您可以在此處找到源代碼。
什么是Spring Cloud Streaming?
Spring Cloud Stream是基于Spring Boot構建的框架,用于構建消息驅動的微服務。
什么是卡夫卡?
Kafka是最初由LinkedIn開發的流行的高性能和水平可伸縮的消息傳遞平臺。
安裝Kafka
從這里下載Kafka并將其解壓縮:
>?tar -xzf kafka_2.11-1.0.0.tgz > cd kafka_2.11-1.0.0啟動Zookeeper和Kafka
在Windows上:
>?bin\windows\zookeeper-server-start.bat config\zookeeper.properties > bin\windows\kafka-server-start.bat config\server.properties在Linux或Mac上:
>?bin/zookeeper-server-start.sh config/zookeeper.properties > bin/kafka-server-start.sh config/server.properties如果計算機從休眠狀態喚醒后,Kafka沒有運行并且無法啟動,請刪除<TMP_DIR>/kafka-logs文件夾,然后再次啟動Kafka。
什么是Lombok?
Lombok是一個Java框架,可在代碼中自動生成getter,setter,toString(),構建器,記錄器等。
Maven依賴
轉到https://start.spring.io創建一個Maven項目:
注意pom.xml文件中的Maven依賴項:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><!-- Also install the Lombok plugin in your IDE --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency>…還有<dependencyManagement>部分:
<dependencyManagement><dependencies><dependency><!-- Import dependency management from Spring Boot --><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>${spring-cloud-stream.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>…和<repository>部分:
<repository><id>spring-milestones</id><name>Spring Milestones</name><url>http://repo.spring.io/libs-milestone</url><snapshots><enabled>false</enabled></snapshots> </repository>定義卡夫卡流
package com.kaviddiss.streamkafka.stream;import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel;? public interface GreetingsStreams {String INPUT = "greetings-in";String OUTPUT = "greetings-out";@Input(INPUT)SubscribableChannel inboundGreetings();@Output(OUTPUT)MessageChannel outboundGreetings(); }為了使我們的應用程序能夠與Kafka進行通信,我們需要定義一個出站流以將消息寫入Kafka主題,并定義一個入站流以讀取來自Kafka主題的消息。
通過簡單地創建一個接口為每個流定義單獨的方法,Spring Cloud提供了一種方便的方法。
inboundGreetings()方法定義要從Kafka讀取的入站流,而outboundGreetings()方法定義要寫入Kafka的出站流。
在運行時,Spring將為GreetingsStreams接口創建一個基于Java代理的實現,該實現可以作為Spring Bean注入到代碼中的任何位置,以訪問我們的兩個流。
配置Spring Cloud Stream
下一步是將Spring Cloud Stream配置為綁定到GreetingsStreams接口中的流。 這可以通過使用以下代碼創建@Configuration類com.kaviddiss.streamkafka.config.StreamsConfig來完成:
package com.kaviddiss.streamkafka.config;import com.kaviddiss.streamkafka.stream.GreetingsStreams; import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(GreetingsStreams.class) public class StreamsConfig { }使用@EnableBinding批注(將GreatingsService接口傳遞到該批注)完成@EnableBinding的GreatingsService 。
Kafka的配置屬性
默認情況下,配置屬性存儲在src/main/resources/application.properties文件中。
但是,我更喜歡使用YAML格式,因為它不太冗長,并且允許將公共屬性和特定于環境的屬性保留在同一文件中。
現在,讓我們將application.properties重命名為application.yaml并將config片段下方粘貼到文件中:
spring:cloud:stream:kafka:binder:brokers: localhost:9092bindings:greetings-in:destination: greetingscontentType: application/jsongreetings-out:destination: greetingscontentType: application/json上面的配置屬性配置要連接的Kafka服務器的地址,以及我們用于代碼中的入站和出站流的Kafka主題。 他們倆都必須使用相同的Kafka主題!
contentType屬性告訴Spring Cloud Stream在流中以String的形式發送/接收我們的消息對象。
創建消息對象
使用下面的代碼創建一個簡單的com.kaviddiss.streamkafka.model.Greetings類,該代碼將表示我們從中讀取并寫入的greetings Kafka主題:
package com.kaviddiss.streamkafka.model;// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/): import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.ToString;@Getter @Setter @ToString @Builder public class Greetings {private long timestamp;private String message; }注意,由于Lombok批注,該類如何沒有任何getter和setter。 @ToString將使用類的字段生成toString()方法,而@Builder批注將允許我們使用流暢的生成器創建Greetings對象(請參見下文)。
創建服務層以寫入Kafka
讓我們創建的com.kaviddiss.streamkafka.service.GreetingsService下面的代碼,將寫一個類Greetings對象的greetings卡夫卡話題:
package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.stream.GreetingsStreams; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.MimeTypeUtils;@Service @Slf4j public class GreetingsService {private final GreetingsStreams greetingsStreams;public GreetingsService(GreetingsStreams greetingsStreams) {this.greetingsStreams = greetingsStreams;}public void sendGreeting(final Greetings greetings) {log.info("Sending greetings {}", greetings);MessageChannel messageChannel = greetingsStreams.outboundGreetings();messageChannel.send(MessageBuilder.withPayload(greetings).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());}@Service批注會將此類配置為Spring Bean,并通過構造函數注入GreetingsService依賴項。
@Slf4j批注將生成一個SLF4J記錄器字段,可用于記錄日志。
在sendGreeting()方法中,我們使用注入的GreetingsStream對象發送由Greetings對象表示的消息。
創建REST API
現在,我們將創建一個REST api端點,該端點將觸發使用GreetingsService Spring Bean向Kafka發送消息:
package com.kaviddiss.streamkafka.web;import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.service.GreetingsService; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController;?@RestController public class GreetingsController {private final GreetingsService greetingsService;public GreetingsController(GreetingsService greetingsService) {this.greetingsService = greetingsService;}@GetMapping("/greetings")@ResponseStatus(HttpStatus.ACCEPTED)public void greetings(@RequestParam("message") String message) {Greetings greetings = Greetings.builder().message(message).timestamp(System.currentTimeMillis()).build();greetingsService.sendGreeting(greetings);} }@RestController注釋告訴Spring這是一個Controller bean(MVC中的C)。 greetings()方法定義一個HTTP GET /greetings端點,該端點接受message請求參數,并將其傳遞給GreetingsService的sendGreeting()方法。
聽問候卡夫卡主題
讓我們創建一個com.kaviddiss.streamkafka.service.GreetingsListener類,該類將偵聽greetings Kafka主題上的消息并將其記錄在控制臺上:
package com.kaviddiss.streamkafka.service;import com.kaviddiss.streamkafka.model.Greetings; import com.kaviddiss.streamkafka.stream.GreetingsStreams; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;@Component @Slf4j public class GreetingsListener {@StreamListener(GreetingsStreams.INPUT)public void handleGreetings(@Payload Greetings greetings) {log.info("Received greetings: {}", greetings);} }@Component批注類似于@Service @Component , @Service @RestController定義了一個Spring Bean。
GreetingsListener有一個方法, handleGreetings()將通過云春流與每一個新的調用Greetings的消息對象greetings卡夫卡的話題。 這要感謝為handleGreetings()方法配置的@StreamListener批注。
運行應用程序
最后一個難題是由Spring Initializer自動生成的com.kaviddiss.streamkafka.StreamKafkaApplication類:
package com.kaviddiss.streamkafka;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class StreamKafkaApplication {public static void main(String[] args) {SpringApplication.run(StreamKafkaApplication.class, args);} }無需在此處進行任何更改。 您可以在您的IDE中將此類作為Java應用程序運行,也可以使用Spring Boot maven插件從命令行運行該應用程序:
>?mvn spring-boot:run應用程序運行后,在瀏覽器中轉到http:// localhost:8080 / greetings?message = hello并檢查您的控制臺。
摘要
我希望您喜歡本教程。 隨時提出任何問題并留下您的反饋。
翻譯自: https://www.javacodegeeks.com/2018/03/spring-cloud-stream-kafka.html
總結
以上是生活随笔為你收集整理的Kafka的Spring Cloud Stream的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java World中的GraphQL简
- 下一篇: 服务器受到ddos攻击(ddos能入侵服