Kafka是當下非常流行的消息中間件,據官網透露,已有成千上萬的公司在使用它。最近實踐了一波Kafka,確實很好很強大。今天我們來從三個方面學習下Kafka:Kafaka在Linux下的安裝,Kafka的可視化工具,Kafka和SpringBoot結合使用。希望大家看完后能快速入門Kafka,掌握這個流行的消息中間件!
? Kafka簡介 Kafka是由LinkedIn公司開發的一款開源分布式消息流平臺,由Scala和Java編寫。主要作用是為處理實時數據提供一個統一、高吞吐、低延遲的平臺,其本質是基于發布訂閱模式的消息引擎系統。
Kafka具有以下特性:
高吞吐、低延遲:Kafka收發消息非常快,使用集群處理消息延遲可低至2ms。
高擴展性:Kafka可以彈性地擴展和收縮,可以擴展到上千個broker,數十萬個partition,每天處理數萬億條消息。
永久存儲:Kafka可以將數據安全地存儲在分布式的,持久的,容錯的群集中。
高可用性:Kafka在可用區上可以有效地擴展群集,某個節點宕機,集群照樣能夠正常工作。
? Kafka安裝 我們將采用Linux下的安裝方式,安裝環境為CentOS 7.6。此處沒有采用Docker來安裝部署,個人感覺直接安裝更簡單(主要是官方沒提供Docker鏡像)!
cd?/mydata/kafka/
tar?-xzf?kafka_2.13-2.8.0.tgz
cd?kafka_2.13-2.8.0
#?后臺運行服務,并把日志輸出到當前文件夾下的zookeeper-out.file文件中
nohup?bin/zookeeper-server-start.sh?config/zookeeper.properties?>?zookeeper-out.file?2>&1?&
############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.5.78:9092
#?后臺運行服務,并把日志輸出到當前文件夾下的kafka-out.file文件中
nohup?bin/kafka-server-start.sh?config/server.properties?>?kafka-out.file?2>&1?&
Kafka命令行操作 接下來我們使用命令行來操作下Kafka,熟悉下Kafka的使用。
bin/kafka-topics.sh?--create?--topic?consoleTopic?--bootstrap-server?192.168.5.78:9092
bin/kafka-topics.sh?--describe?--topic?consoleTopic?--bootstrap-server?192.168.5.78:9092
Topic:?consoleTopic?TopicId:?tJmxUQ8QRJGlhCSf2ojuGw?PartitionCount:?1?ReplicationFactor:?1?Configs:?segment.bytes=1073741824Topic:?consoleTopic?Partition:?0?Leader:?0?Replicas:?0?Isr:?0
bin/kafka-console-producer.sh?--topic?consoleTopic?--bootstrap-server?192.168.5.78:9092
bin/kafka-console-consumer.sh?--topic?consoleTopic?--from-beginning?--bootstrap-server?192.168.5.78:9092
? Kafka可視化 使用命令行操作Kafka確實有點麻煩,接下來我們試試可視化工具kafka-eagle。
安裝JDK 如果你使用的是CentOS的話,默認沒有安裝完整版的JDK,需要自行安裝!
cd?/mydata/java
tar?-zxvf?OpenJDK8U-jdk_x64_linux_xxx.tar.gz
mv?OpenJDK8U-jdk_x64_linux_xxx.tar.gz?jdk1.8
vi?/etc/profile
#?在profile文件中添加
export?JAVA_HOME=/mydata/java/jdk1.8
export?PATH=$PATH:$JAVA_HOME/bin
#?使修改后的profile文件生效
.?/etc/profile
安裝kafka-eagle
cd?/mydata/kafka/
tar?-zxvf?kafka-eagle-web-2.0.5-bin.tar.gz
vi?/etc/profile
#?在profile文件中添加
export?KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5
export?PATH=$PATH:$KE_HOME/bin
#?使修改后的profile文件生效
.?/etc/profile
######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048######################################
# kafka sqlite jdbc driver address
######################################
# kafka.eagle.driver=org.sqlite.JDBC
# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# kafka.eagle.username=root
# kafka.eagle.password=www.kafka-eagle.org######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root
$KE_HOME/bin/ke.sh?start
#?停止服務
$KE_HOME/bin/ke.sh?stop
#?重啟服務
$KE_HOME/bin/ke.sh?restart
#?查看服務運行狀態
$KE_HOME/bin/ke.sh?status
#?查看服務狀態
$KE_HOME/bin/ke.sh?stats
#?動態查看服務輸出日志
tail?-f?$KE_HOME/logs/ke_console.out
可視化工具使用
bin/kafka-console-consumer.sh?--topic?testTopic?--from-beginning?--bootstrap-server?192.168.5.78:9092
vi?kafka-server-start.sh
#?暴露JMX端口
if?[?"x$KAFKA_HEAP_OPTS"?=?"x"?];?thenexport?KAFKA_HEAP_OPTS="-server?-Xms2G?-Xmx2G?-XX:PermSize=128m?-XX:+UseG1GC?-XX:MaxGCPauseMillis=200?-XX:ParallelGCThreads=8?-XX:ConcGCThreads=5?-XX:InitiatingHeapOccupancyPercent=70"export?JMX_PORT="9999"
fi
? ? SpringBoot整合Kafka 在SpringBoot中操作Kafka也是非常簡單的,比如Kafka的消息模式很簡單,沒有隊列,只有Topic。
<!--Spring整合Kafka-->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.1</version>
</dependency>
server:port:?8088
spring:kafka:bootstrap-servers:?'192.168.5.78:9092'consumer:group-id:?"bootGroup"
/***?Kafka消息生產者*?Created?by?macro?on?2021/5/19.*/
@Component
public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;public?void?send(String?message){kafkaTemplate.send("bootTopic",message);}
}
/***?Kafka消息消費者*?Created?by?macro?on?2021/5/19.*/
@Slf4j
@Component
public?class?KafkaConsumer?{@KafkaListener(topics?=?"bootTopic")public?void?processMessage(String?content)?{log.info("consumer?processMessage?:?{}",content);}}
/***?Kafka功能測試*?Created?by?macro?on?2021/5/19.*/
@Api(tags?=?"KafkaController",?description?=?"Kafka功能測試")
@Controller
@RequestMapping("/kafka")
public?class?KafkaController?{@Autowiredprivate?KafkaProducer?kafkaProducer;@ApiOperation("發送消息")@RequestMapping(value?=?"/sendMessage",?method?=?RequestMethod.GET)@ResponseBodypublic?CommonResult?sendMessage(@RequestParam?String?message)?{kafkaProducer.send(message);return?CommonResult.success(null);}
}
2021-05-19?16:59:21.016??INFO?2344?---?[ntainer#0-0-C-1]?c.m.mall.tiny.component.KafkaConsumer????:?consumer?processMessage?:?Spring?Boot?message!
? ? 總結 通過本文的一波實踐,大家基本就能入門Kafka了。安裝、可視化工具、結合SpringBoot,這些基本都是和開發者相關的操作,也是學習Kafka的必經之路。
參考資料 Kafka官方文檔:https://kafka.apache.org/quickstart
kafka-eagle官方文檔:http://www.kafka-eagle.org/articles/docs/introduce/getting-started.html
Kafka相關概念:https://juejin.cn/post/6844903495670169607
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道 公眾號
好文章,我在看??
總結
以上是生活随笔 為你收集整理的吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你! 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。