数据源管理 | Kafka集群环境搭建,消息存储机制详解
本文源碼:GitHub·點(diǎn)這里 || GitEE·點(diǎn)這里
一、Kafka集群環(huán)境
1、環(huán)境版本
版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。
2、解壓重命名
tar -zxvf kafka_2.11-0.11.0.0.tgz mv kafka_2.11-0.11.0.0 kafka2.11創(chuàng)建日志目錄
[root@en-master kafka2.11]# mkdir logs注意:以上操作需要同步到集群下其他服務(wù)上。
3、添加環(huán)境變量
vim /etc/profile export KAFKA_HOME=/opt/kafka2.11 export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile4、修改核心配置
[root@en-master /opt/kafka2.11/config]# vim server.properties -- 核心修改如下 # 唯一編號 broker.id=0 # 開啟topic刪除 delete.topic.enable=true # 日志地址 log.dirs=/opt/kafka2.11/logs # zk集群 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181注意:broker.id安裝集群服務(wù)個(gè)數(shù)編排即可,集群下不能重復(fù)。
5、啟動kafka集群
# 啟動命令 [root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties # 停止命令 [root@node02 kafka2.11]# bin/kafka-server-stop.sh # 進(jìn)程查看 [root@node02 kafka2.11]# jps注意:這里默認(rèn)啟動了zookeeper集群服務(wù),并且集群下的kafka分別啟動。
6、基礎(chǔ)管理命令
創(chuàng)建topic
bin/kafka-topics.sh --zookeeper zk01:2181 \ --create --replication-factor 3 --partitions 1 --topic one-topic參數(shù)說明:
- replication-factor 定義副本個(gè)數(shù)
- partitions 定義分區(qū)個(gè)數(shù)
- topic:定義topic名稱
查看topic列表
bin/kafka-topics.sh --zookeeper zk01:2181 --list修改topic分區(qū)
bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5查看topic
bin/kafka-topics.sh --zookeeper zk01:2181 \ --describe --topic one-topic發(fā)送消息
bin/kafka-console-producer.sh \ --broker-list 192.168.72.133:9092 --topic one-topic消費(fèi)消息
bin/kafka-console-consumer.sh \ --bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic刪除topic
bin/kafka-topics.sh --zookeeper zk01:2181 \ --delete --topic first7、Zk集群用處
Kafka集群中有一個(gè)broker會被選舉為Controller,Controller依賴Zookeeper環(huán)境,管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作。
二、消息攔截案例
1、攔截器簡介
Kafka中間件的Producer攔截器主要用于實(shí)現(xiàn)消息發(fā)送的自定義控制邏輯。用戶可以在消息發(fā)送前以及回調(diào)邏輯執(zhí)行前有機(jī)會對消息做一些自定義,比如消息修改等,發(fā)送狀態(tài)監(jiān)控等,用戶可以指定多個(gè)攔截器按順序執(zhí)行攔截。
核心方法
- configure:獲取配置信息和初始化數(shù)據(jù)時(shí)調(diào)用;
- onSend:消息被序列化以及和計(jì)算分區(qū)前調(diào)用該方法,可以對消息做操作;
- onAcknowledgement:消息發(fā)送到Broker之后,或發(fā)送過程失敗時(shí)調(diào)用;
- close:關(guān)閉攔截器調(diào)用,執(zhí)行一些資源清理工作;
注意:這里說的攔截器是針對消息發(fā)送流程。
2、自定義攔截
定義方式:實(shí)現(xiàn)ProducerInterceptor接口即可。
攔截器一:在onSend方法中,對攔截的消息進(jìn)行修改。
@Component public class SendStartInterceptor implements ProducerInterceptor<String, String> {private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor");@Overridepublic void configure(Map<String, ?> configs) {LOGGER.info("configs...");}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 修改消息內(nèi)容return new ProducerRecord<>(record.topic(), record.partition(),record.timestamp(), record.key(),"onSend:{" + record.value()+"}");}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {LOGGER.info("onAcknowledgement...");}@Overridepublic void close() {LOGGER.info("SendStart close...");} }攔截器二:在onAcknowledgement方法中,判斷消息是否發(fā)送成功。
@Component public class SendOverInterceptor implements ProducerInterceptor<String, String> {private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor");@Overridepublic void configure(Map<String, ?> configs) {LOGGER.info("configs...");}@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {LOGGER.info("record...{}", record.value());return record ;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if (exception != null){LOGGER.info("Send Fail...exe-msg",exception.getMessage());}LOGGER.info("Send success...");}@Overridepublic void close() {LOGGER.info("SendOver close...");} }加載攔截器:基于一個(gè)KafkaProducer配置Bean,加入攔截器。
@Configuration public class KafkaConfig {@Beanpublic Producer producer (){Properties props = new Properties();// 省略其他配置...// 添加攔截器List<String> interceptors = new ArrayList<>();interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor");interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor");props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);return new KafkaProducer<>(props) ;} }3、代碼案例
@RestController public class SendMsgWeb {@Resourceprivate KafkaProducer<String,String> producer ;@GetMapping("/sendMsg")public String sendMsg (){producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));return "success" ;} }基于上述自定義Bean類型,進(jìn)行消息發(fā)送,關(guān)注攔截器中打印日志信息。
三、Kafka存儲分析
說明:該過程基于上述案例producer.send方法追蹤的源碼執(zhí)行流程,源碼中的過程相對清楚,涉及的核心流程如下。
1、消息生成過程
Producer發(fā)送消息采用的是異步發(fā)送的方式,消息發(fā)送過程如下:
- Producer發(fā)送消息之后,經(jīng)過攔截器,序列化,事務(wù)判斷;
- 流程執(zhí)行后,消息內(nèi)容放入容器中;
- 容器在指定時(shí)間內(nèi)如果裝滿(size),會喚醒Sender線程;
- 容器如果在指定時(shí)間內(nèi)沒有裝滿,也會執(zhí)行一次Sender線程喚醒;
- 喚醒Sender線程之后,把容器數(shù)據(jù)拉取到topic中;
絮叨一句:讀這些中間件的源碼,不僅能開闊思維,也會讓自己意識到平時(shí)寫的代碼可能真的叫搬磚。
2、存儲機(jī)制
Kafka中消息是以topic進(jìn)行標(biāo)識分類,生產(chǎn)者面向topic生產(chǎn)消息,topic分區(qū)(partition)是物理上的存儲,基于消息日志文件的方式。
- 每個(gè)partition對應(yīng)于一個(gè)log文件,發(fā)送的消息不斷追加到該log文件末端;
- log文件中存儲的就是producer生產(chǎn)的消息數(shù)據(jù),采用分片和索引機(jī)制;
- partition分為多個(gè)segment。每個(gè)segment對應(yīng)兩個(gè)(.index)和(.log)文件;
- index文件類型存儲的索引信息;
- log文件存儲消息的數(shù)據(jù);
- 索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址;
- 消費(fèi)者組中的每個(gè)消費(fèi)者,都會實(shí)時(shí)記錄消費(fèi)的消息offset位置;
- 當(dāng)然消息消費(fèi)出錯(cuò)時(shí),恢復(fù)是從上次的記錄位置繼續(xù)消費(fèi);
3、事務(wù)控制機(jī)制
Kafka支持消息的事務(wù)控制
Producer事務(wù)
跨分區(qū)跨會話的事務(wù)原理,引入全局唯一的TransactionID,并將Producer獲得的PID和TransactionID綁定。Producer重啟后可以通過正在進(jìn)行的TransactionID獲得原來的PID。
Kafka基于TransactionCoordinator組件管理Transaction,Producer通過和TransactionCoordinator交互獲得TransactionID對應(yīng)的任務(wù)狀態(tài)。TransactionCoordinator將事務(wù)狀態(tài)寫入Kafka的內(nèi)部Topic,即使整個(gè)服務(wù)重啟,進(jìn)行中的事務(wù)狀態(tài)可以得到恢復(fù)。
Consumer事務(wù)
Consumer消息消費(fèi),事務(wù)的保證強(qiáng)度很低,無法保證消息被精確消費(fèi),因?yàn)橥皇聞?wù)的消息可能會出現(xiàn)重啟后已經(jīng)被刪除的情況。
四、源代碼地址
GitHub·地址 https://github.com/cicadasmile/data-manage-parent GitEE·地址 https://gitee.com/cicadasmile/data-manage-parent推薦關(guān)聯(lián)閱讀:數(shù)據(jù)源管理系列
| 01 | 數(shù)據(jù)源管理:主從庫動態(tài)路由,AOP模式讀寫分離 |
| 02 | 數(shù)據(jù)源管理:基于JDBC模式,適配和管理動態(tài)數(shù)據(jù)源 |
| 03 | 數(shù)據(jù)源管理:動態(tài)權(quán)限校驗(yàn),表結(jié)構(gòu)和數(shù)據(jù)遷移流程 |
| 04 | 數(shù)據(jù)源管理:關(guān)系型分庫分表,列式庫分布式計(jì)算 |
| 05 | 數(shù)據(jù)源管理:PostGreSQL環(huán)境整合,JSON類型應(yīng)用 |
| 06 | 數(shù)據(jù)源管理:基于DataX組件,同步數(shù)據(jù)和源碼分析 |
| 07 | 數(shù)據(jù)源管理:OLAP查詢引擎,ClickHouse集群化管理 |
總結(jié)
以上是生活随笔為你收集整理的数据源管理 | Kafka集群环境搭建,消息存储机制详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IE8中HTTP连接数目的变化
- 下一篇: 如何让gridview中的checkbo