kafka完整集群安装
kafka集群模式安裝
- 集群規(guī)劃
- 1.安裝包下載
- 2.安裝zookeeper集群
- 3.安裝kafka
- 1.檢查zookeeper集群是否運行
- 2.上傳安裝包并解壓
- 3.修改配置文件
- 4.同步安裝包到其他服務(wù)器
- 5.啟動和停止kafka
- 4.集群測試
- 1.創(chuàng)建一個Topic
- 2.查看集群中的Topic
- 3.生產(chǎn)和消費數(shù)據(jù)測試
- 模擬生產(chǎn)者來生產(chǎn)數(shù)據(jù)
- 模擬消費者消費數(shù)據(jù)
- 運行describe topics命令查看分區(qū)情況
- 4. 修改topic分區(qū)數(shù)
- 5. 刪除topic
集群規(guī)劃
| node1 | 192.168.88.11 | zookeeper,kafka,jdk1.8 |
| node2 | 192.168.88.11 | zookeeper,kafka,jdk1.8 |
| node3 | 192.168.88.11 | zookeeper,kafka,jdk1.8 |
1.安裝包下載
下載鏈接:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz
2.安裝zookeeper集群
請參考
zookeeper集群搭建:https://blog.csdn.net/smartsteps/article/details/119815461
3.安裝kafka
1.檢查zookeeper集群是否運行
每臺機器上執(zhí)行查看運行狀態(tài),一共一個主節(jié)點,兩個從節(jié)點./zkServer.sh status node1: Mode: follower node2: Mode: follower node3: Mode: leader2.上傳安裝包并解壓
在node1上執(zhí)行 解壓到/home目錄下
tar -zxvf kafka_2.12-2.8.0.tgz -C /home/3.修改配置文件
創(chuàng)建日志文件件 mkdir /home/kafka_2.12-2.8.0/logs 進入配置文件 cd /home/kafka_2.12-2.8.0/configvi server.properties修改配置文件
broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/kafka_2.12-2.8.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true host.name=node14.同步安裝包到其他服務(wù)器
scp -r kafka_2.12-2.8.0/ node2:$PWDscp -r kafka_2.12-2.8.0/ node3:$PWD修改node2 kafka集群的配置
cd /home/kafka_2.12-2.8.0/config vi server.properties broker.id=1 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/kafka_2.12-2.8.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true host.name=node2修改node3 kafka集群的配置
cd /home/kafka_2.12-2.8.0/config vi server.properties broker.id=2 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/kafka_2.12-2.8.0/logs num.partitions=2 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 delete.topic.enble=true host.name=node35.啟動和停止kafka
每臺服務(wù)器都要執(zhí)行
cd /home/kafka_2.12-2.8.0 bin/kafka-server-start.sh config/server.properties后臺啟動
cd /home/kafka_2.12-2.8.0 nohup bin/kafka-server-start.sh config/server.properties 2>&1 &停止服務(wù)
cd /home/kafka_2.12-2.8.0 bin/kafka-server-stop.sh4.集群測試
1.創(chuàng)建一個Topic
node1執(zhí)行以下命令來創(chuàng)建topic
創(chuàng)建了一個名字為test的主題, 有三個分區(qū),有兩個副本
2.查看集群中的Topic
查看kafka當(dāng)中存在的主題
node1使用以下命令來查看kafka當(dāng)中存在的topic主題
3.生產(chǎn)和消費數(shù)據(jù)測試
模擬生產(chǎn)者來生產(chǎn)數(shù)據(jù)
node1服務(wù)器執(zhí)行以下命令來模擬生產(chǎn)者進行生產(chǎn)數(shù)據(jù)
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test模擬消費者消費數(shù)據(jù)
node2服務(wù)器執(zhí)行以下命令來模擬消費者進行消費數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic test運行describe topics命令查看分區(qū)情況
bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test
這是輸出的解釋。第一行給出了所有分區(qū)的摘要,每個附加行提供有關(guān)一個分區(qū)的信息。由于我們只有一個分 區(qū)用于此主題,因此只有一行。
“l(fā)eader”是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點。每個節(jié)點將成為隨機選擇的分區(qū)部分的領(lǐng)導(dǎo)者。(因為在kafka中 如果有多個副本的話,就會存在leader和follower的關(guān)系,表示當(dāng)前這個副本為leader所在的broker是哪一個)
“replicas”是復(fù)制此分區(qū)日志的節(jié)點列表,無論它們是否為領(lǐng)導(dǎo)者,或者即使它們當(dāng)前處于活動狀態(tài)。(所有副本列表 0 ,1,2)
“isr”是“同步”復(fù)制品的集合。這是副本列表的子集,該列表當(dāng)前處于活躍狀態(tài)并且已經(jīng)被領(lǐng)導(dǎo)者捕獲。(可用的列表 數(shù))
4. 修改topic分區(qū)數(shù)
bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --partitions 25. 刪除topic
目前刪除topic在默認(rèn)情況下知識打上一個刪除的標(biāo)記,在重新啟動kafka后才刪除。如果需要立即刪除,則需要在 server.properties中配置: delete.topic.enable=true然后執(zhí)行以下命令進行刪除topic
bin/kafka-topics.sh --zookeeper node1:2181 --delete --topic test總結(jié)
以上是生活随笔為你收集整理的kafka完整集群安装的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: win7设置wifi热点_Windows
- 下一篇: Ubuntu18.04系统的安装及使用,