kafka集群操作指南
kafka集群操作指南
@(KAFKA)[kafka, 大數據]
- kafka集群操作指南
- 一單機版安裝
- 二集群安裝
- 三集群啟停操作
- 四topic相關的操作
- 五某個broker掛掉本機器可重啟
- 六某個broker掛掉且無法重啟需要其它機器代替
- 七擴容
- 八數據遷移
- 九機器下線
- 十增加副本數量
- 十一leader的平衡
- 十二kafka集群網絡不可達
- 十三某個topic突然不可用
- 十四zk不可用
- 十五kafka長期未恢復
- 十六為某個topic線上增加partition
(一)單機版安裝
此部分不可用于生產,但新接觸kafka時,可以先有個感性的認識
Step 1: 下載Kafka
下載最新的版本并解壓.
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz $ tar -zxvf kafka_2.10-0.8.2.1.tgzStep 2: 啟動服務
Kafka用到了Zookeeper,所有首先啟動Zookper,下面簡單的啟用一個單實例的Zookkeeper服務。可以在命令的結尾加個&符號,這樣就可以啟動后離開控制臺。
現在啟動Kafka:
bin/kafka-server-start.sh config/server.propertiesStep 3: 創建 topic
創建一個叫做“test”的topic,它只有一個分區,一個副本。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test [2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket) Created topic "test".可以通過list命令查看創建的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 > test除了手動創建topic,還可以配置broker讓它自動創建topic.
Step 4:發送消息.
Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息并發送到服務端。默認的每條命令將發送一條消息。
運行producer并在控制臺中輸一些消息,這些消息將被發送到服務端:
ctrl+c可以退出發送。
默認情況下,日志數據會被放置到/tmp/kafka-logs中,每個分區一個目錄
Step 5: 啟動consumer
Kafka也有一個命令行consumer可以讀取消息并輸出到標準輸出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning This is a message This is another message你在一個終端中運行consumer命令行,另一個終端中運行producer命令行,就可以在一個終端輸入消息,另一個終端讀取消息。
這兩個命令都有自己的可選參數,可以在運行的時候不加任何參數可以看到幫助信息。
(二)集群安裝
注意,必須先搭建zookeeper集群
1、使用3臺機器搭建Kafka集群:
192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test
2、在安裝Kafka集群之前,這里沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集群,也是使用這3臺機器,保證Zookeeper集群正常運行。
3、首先,在gdc-dn01-test上準備Kafka安裝文件,執行如下命令:
4、修改配置文件kafka/config/server.properties,修改如下內容:
broker.id=0 zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka這里需要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果 你有其他的應用也在使用ZooKeeper集群,查看ZooKeeper中數據可能會不直觀,所以強烈建議指定一個chroot路徑,直接在 zookeeper.connect配置項中指定:
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka而且,需要手動在ZooKeeper中創建路徑/kafka,使用如下命令連接到任意一臺 ZooKeeper服務器:
cd ~/zookeeper bin/zkCli.sh在ZooKeeper執行如下命令創建chroot路徑:
create /kafka ”
這樣,每次連接Kafka集群的時候(使用–zookeeper選項),也必須使用帶chroot路徑的連接字符串,后面會看到。
5、然后,將配置好的安裝文件同步到其他的dn02、dn03節點上:
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoopscp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop
6、最后,在dn02、dn03節點上配置修改配置文件kafka/config/server.properties內容如下所示:
broker.id=1 # 在dn02修改 broker.id=2 # 在dn03修改因為Kafka集群需要保證各個Broker的id在整個集群中必須唯一,需要調整這個配置項的值(如果在單機上,可以通過建立多個Broker進程來模擬分布式的Kafka集群,也需要Broker的id唯一,還需要修改一些配置目錄的信息)。
7、在集群中的dn01、dn02、dn03這三個節點上分別啟動Kafka,分別執行如下命令:
bin/kafka-server-start.sh config/server.properties &可以通過查看日志,或者檢查進程狀態,保證Kafka集群啟動成功。
注意,對于應用要求較高的集群,需要調整kafka-server-start.sh中的JVM堆大小
8、創建一個名稱為my-replicated-topic5的Topic,5個分區,并且復制因子為3,執行如下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic59、查看創建的Topic,執行如下命令:
bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5結果信息如下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:Topic: my-replicated-topic5 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Topic: my-replicated-topic5 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0Topic: my-replicated-topic5 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0Topic: my-replicated-topic5 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1上面Leader、Replicas、Isr的含義如下:
1 Partition: 分區
2 Leader : 負責讀寫指定分區的節點
3 Replicas : 復制該分區log的節點列表
4 Isr : “in-sync” replicas,當前活躍的副本列表(是一個子集),并且可能成為Leader
我們可以通過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示如果發布消息、消費消息。
11、在一個終端,啟動Producer,并向我們上面創建的名稱為my-replicated-topic5的Topic中生產消息,執行如下腳本:
12、在另一個終端,啟動Consumer,并訂閱我們上面創建的名稱為my-replicated-topic5的Topic中生產的消息,執行如下腳本:
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5可以在Producer終端上輸入字符串消息行,就可以在Consumer終端上看到消費者消費的消息內容。
也可以參考Kafka的Producer和Consumer的Java API,通過API編碼的方式來實現消息生產和消費的處理邏輯。
(三)集群啟停操作
1、啟動集群
bin/kafka-server-start.sh config/server.properties &
2、停止集群
bin/kafka-server-stop.sh3、重啟
沒有專用腳本,先停后啟即可
注:當然也可以使用kill命令來關閉,但使用腳本有以下好處:
(1)It will sync all its logs to disk to avoid needing to do any log recovery when it restarts (i.e. validating the checksum for all messages in the tail of the log). Log recovery takes time so this speeds up intentional restarts.
(2)It will migrate any partitions the server is the leader for to other replicas prior to shutting down. This will make the leadership transfer faster and minimize the time each partition is unavailable to a few milliseconds.
(四)topic相關的操作
1、創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.172.98:2181/kafka --replication-factor 2 --partitions 3 --topic test_topic(1)zookeeper指定其中一個節點即可,集群之間會自動同步。
(2)–replication-factor 2 –partitions 3理論上應該是可選參數,但此腳本必須寫這2個參數。
(3)還可以使用–config
(五)某個broker掛掉,本機器可重啟
【結論】如果一個broker掛掉,且可以重啟則處理步驟如下:
(1)重啟kafka進程
(2)執行rebalance(由于已經設置配置項自動執行balance,因此此步驟一般可忽略)
詳細分析見下面操作過程。
1、topic的情況
集群中有4臺機器,id為【2-5】,topic 有3個分區,每個分區2個副本,leader分別位于2,3,5中。
2、模擬機器down,kill掉進程
分區0的leader位于id=5的broker中,kill掉這臺機器的kafka進程
kill -9 **
3、再次查看topic的情況
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4可以看到,分區0的leader已經移到id=2的機器上了,它的副本位于2,5這2臺機器上,但處于同步狀態的只有id=2這臺機器。
4、重啟kafka進程
bin/kafka-server-start.sh config/server.properties &5、再次查看狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4發現分區0的2個副本都已經處于同步狀態,但leader依然為id=2的broker。
6、執行leader平衡
詳見leader的平衡部分。
如果配置文件中
auto.leader.rebalance.enable=true則此步驟不需要執行。
7、重新查看topic
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4此時leader已經回到了id=5的broker,一切恢復正常。
(六)某個broker掛掉且無法重啟,需要其它機器代替
【結論】當一個broker掛掉,需要換機器時,采用以下步驟:
1、將新機器kafka配置文件中的broker.id設置為與原機器一樣
2、啟動kafka,注意kafka保存數據的目錄不會自動創建,需要手工創建
詳細分析過程如下:
1、初始化機器,主要包括用戶創建,kafka文件的復制等。
2、修改config/server.properties文件
注意,只需要修改一個配置broker.id,且此配置必須與掛掉的那臺機器相同,因為kafka是通過broker.id來區分集群中的機器的。此處設為
3、查看topic的當前狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4當前topic有3個分區,其中分區1的leader位于id=5的機器上。
4、關掉id=5的機器
kill -9 ** 用于模擬機器突然down
或者:
用于正常關閉
5、查看topic的狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4可見,topic的分區0的leader已經遷移到了id=2的機器上,且處于同步的機器只有一個了。
6、啟動新機器
nohup bin/kafka-server-start.sh config/server.properties &7、再看topic的狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 2 Replicas: 5,2 Isr: 2,5Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4id=5的機器也處于同步狀態了,但還需要將leader恢復到這臺機器上。
8、執行leader平衡
詳見leader的平衡部分。
bin/kafka-preferred-replica-election.sh –zookeeper 192.168.172.98:2181/kafka
如果配置文件中
auto.leader.rebalance.enable=true則此步驟不需要執行。
9、done
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3Topic: test_topic Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4所有內容都恢復了
(七)擴容
將一臺機器加入kafka集群很容易,只需要為它分配一個獨立的broker id,然后啟動它即可。但是這些新加入的機器上面并沒有任何的分區數據,所以除非將現有數據移動這些機器上,否則它不會做任何工作,直到創建新topic。因此,當你往集群加入機器時,你應該將其它機器上的一部分數據往這臺機器遷移。
數據遷移的工作需要手工初始化,然后自動完成。它的原理如下:當新機器起來后,kafka將其它機器的一些分區復制到這個機器上,并作為follower,當這個新機器完成復制并成為in-sync狀態后,那些被復制的分區的一個副本會被刪除。(都不會成為leader?)
1、將新機器kafka配置文件中的broker.id設置為與原機器一樣
2、啟動kafka,注意kafka保存數據的目錄不會自動創建,需要手工創建
此時新建的topic都會優先分配leader到新增的機器上,但原有的topic不會將分區遷移過來。
3、數據遷移,請見數據遷移部分。
(八)數據遷移
注意:數據遷移時broker之間的流量非常大,導致kafka集群響應很慢,storm幾乎不能讀取數據,因此作數據遷移前必須考慮清楚。或者可以采取新建一個集群的方式(zk指向不同目錄),然后拓撲處理完舊集群數據后,在新集群中重啟即可。
以下步驟用于將現有數據遷移到新的broker中,假設需要將test_topic與streaming_ma30_sdc的全部分區遷移到新的broker中(id 為6和7)
1、創建一個json文件,用于指定哪些topic將被遷移過去
cat topics-to-move.json
注意全角,半角符號,或者中文引號之類的問題。
2、先generate遷移后的結果,檢查一下是不是你要想的效果
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --topics-to-move-json-file topics-to-move.json --broker-list "6,7" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"streaming_ma30_sdc","partition":2,"replicas":[2]},{"topic":"test_topic","partition":0,"replicas":[5,2]},{"topic":"test_topic","partition":2,"replicas":[3,4]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[5]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[4]},{"topic":"test_topic","partition":1,"replicas":[2,3]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[3]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[4]}]}Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":2,"replicas":[7]},{"topic":"test_topic","partition":2,"replicas":[7,6]},{"topic":"streaming_ma30_sdc","partition":1,"replicas":[6]},{"topic":"test_topic","partition":1,"replicas":[6,7]},{"topic":"streaming_ma30_sdc","partition":0,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":4,"replicas":[7]},{"topic":"streaming_ma30_sdc","partition":3,"replicas":[6]}]}分別列出了當前的狀態以及遷移后的狀態。
把這2個json分別保存下來,第一個用來萬一需要roll back的時候使用,第二個用來執行遷移。
3、執行遷移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --execute其中expand-cluster-reassignment.json為保存上面第二段json的文件。
4、查看遷移過程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [streaming_ma30_sdc,0] is still in progress Reassignment of partition [streaming_ma30_sdc,4] is still in progress Reassignment of partition [test_topic,2] completed successfully Reassignment of partition [test_topic,0] completed successfully Reassignment of partition [streaming_ma30_sdc,3] is still in progress Reassignment of partition [streaming_ma30_sdc,1] is still in progress Reassignment of partition [test_topic,1] completed successfully Reassignment of partition [streaming_ma30_sdc,2] is still in progress5、當所有遷移的完成后,查看一下結果是不是你想要的
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 7 Replicas: 7,6 Isr: 6,7Topic: test_topic Partition: 1 Leader: 6 Replicas: 6,7 Isr: 6,7Topic: test_topic Partition: 2 Leader: 7 Replicas: 7,6 Isr: 6,7完成。
如果大部分分區已經完成,但是少量分區沒動靜,則查找這些分區源機器及目標機器,查看日志,必要時重啟kafka進程。
以上步驟將整個topic遷移,也可以只遷移其中一個或者多個分區。
以下將test_topic的分區1移到broker id為2,3的機器,分區2移到broker id為4,5的機器.
【其實還是整個topic遷移好一點,不然準備遷移文件會很麻煩】
1、準備遷移配置文件
cat custom-reassignment.json
3、執行遷移
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --execute4、查看遷移過程
bin/kafka-reassign-partitions.sh --zookeeper 192.168.172.98:2181/kafka --reassignment-json-file custom-reassignment.json --verify5、查看遷移結果
bin/kafka-topics.sh --describe --zookeeper 192.168.172.111:2181/kafka --topic test_topic(九)機器下線
當一個機器下線時,kafka并不會自動將這臺機器上的副本遷移到其它機器上,因此,我們需要手工進行遷移。這個過程會相當的無聊,kafka打算在0.8.2版本中添加此特性。
有了嗎?再找找。如果只是替換機器則不會有這個問題。
(十)增加副本數量
Increasing replication factor
Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the –execute option to increase the replication factor of the specified partitions.
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition’s only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
The first step is to hand craft the custom reassignment plan in a json file-
cat increase-replication-factor.json
{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5,6,7]}]}
Then, use the json file with the –execute option to start the reassignment process-
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file increase-replication-factor.json –execute
Current partition replica assignment
{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5]}]}
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions
{“version”:1,
“partitions”:[{“topic”:”foo”,”partition”:0,”replicas”:[5,6,7]}]}
The –verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the –execute option) should be used with the –verify option
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file increase-replication-factor.json –verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
You can also verify the increase in replication factor with the kafka-topics tool-
bin/kafka-topics.sh –zookeeper localhost:2181 –topic foo –describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
(十一)leader的平衡
當一個broker down掉時,所有本來將它作為leader的分區會被將leader轉移到其它broker。這意味著當這個broker重啟時,它將不再擔任何分區的leader,kafka的client也不會從這個broker來讀取消息,導致資源的浪費。
為了避免這種情況的發生,kafka增加了一個標記:優先副本(preferred replicas)。如果一個分區有3個副本,且這3個副本的優先級別分別為1,5,9,則1會作為leader。為了使kafka集群恢復默認的leader,需要運行以下命令:
或者可以設置以下配置項,leader 會自動執行balance:
auto.leader.rebalance.enable=true這配置默認即為空,但需要經過一段時間后才會觸發,約半小時。
(十二)kafka集群網絡不可達
1、agent將數據failover到本地,等恢復后再上傳
(十三)某個topic突然不可用
1、馬上新建topic,將數據上傳至新topic,同時agent緩存本地
2、
(十四)zk不可用
1、 停止集群
bin/kafka-server-stop注意,不要使用kill -9 ,否則會有數據同步問題
2、待zk恢復后,再重啟kafka進程
(十五)kafka長期未恢復
1、停止當前集群
2、修改server.properties,將日志指向新的路徑,如/data/kafka2
3、指定新的zk路徑
4、啟動kafka,重建topic,先臨時接收數據
(十六)為某個topic線上增加partition
若某個topic在創建時未能正確預測其大小,上線后發現實際數據量比預估的大很多,則需要在增加topic的partition:
1、增加partition
./bin/kafka-topics.sh –zookeeper zk:2181/kafka –alter –partitions 40 –topic g37
2、但此時數據并不寫入新分區,需要重啟proxy。對于直接寫入kafka的,則需要生啟寫入kafka的進程。
* 增加partition的操作并不會使用現有數據被同步到新分區中,不會引起網絡問題 *
* 注意,kafka不能減少partition數量 *
總結
以上是生活随笔為你收集整理的kafka集群操作指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka集群原理介绍
- 下一篇: 线性回归原理与spark/sklearn