大数据技术之kafka (第 3 章 Kafka 架构深入 ) offset讲解
新版的 Kafka 使用一個選舉出來的 controller 來監聽 zookeeper,其他 node 再去和 controller 通信,這么做的目的是為了減少 zookeeper 的壓力。bootstrap-servers 會自動發現其他 broker,這也是 bootstrap 的含義
前面我們講到了消費者,接下來我們詳細講下消費者個數改變,分區重新分配是如何接著消費的
首先我們還是一樣創建一個新的主題topic?主題是名字叫bigdata??這里報錯原因我已經創建過了,主題已經存在
發送一條消息?hello? 因為有兩個分區,其中一個分區有變化,一個沒有變化
我們開啟一個消費者,讓其中一個分區被消費
查看zookeeper? ,進入zk客戶端
[root@backup01 bin]# ./zkCli.sh Connecting to localhost:2181 2020-04-06 08:54:43,793 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT 2020-04-06 08:54:43,816 [myid:] - INFO [main:Environment@100] - Client environment:host.name=backup01 2020-04-06 08:54:43,816 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_172 2020-04-06 08:54:43,817 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/local/java/jdk1.8.0_172/jre 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../build/classes:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../build/lib/*.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../conf: 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-862.el7.x86_64 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2020-04-06 08:54:43,818 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin 2020-04-06 08:54:43,819 [myid:] - INFO [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@277050dc 2020-04-06 08:54:43,845 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1029] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) Welcome to ZooKeeper! JLine support is enabled 2020-04-06 08:54:43,960 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/127.0.0.1:2181, initiating session 2020-04-06 08:54:43,985 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1303] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1000393bf830000, negotiated timeout = 30000WATCHER::WatchedEvent state:SyncConnected type:None path:null我們進入了zk客戶端的根目錄? ?看到的除了zookeeper其他都是kafka
這個controller是后面要聊的,是關于kafka的老大controller不是指定的,這個controller是爭搶資源的,誰先搶到誰就是老大,正常情況下,先啟動的就是老大
[zk: localhost:2181(CONNECTED) 1] get /controllercontroller_epoch controller [zk: localhost:2181(CONNECTED) 1] get /controller {"version":1,"brokerid":0,"timestamp":"1586133981273"} cZxid = 0x500000003 ctime = Mon Apr 06 08:46:21 CST 2020 mZxid = 0x500000003 mtime = Mon Apr 06 08:46:21 CST 2020 pZxid = 0x500000003 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x2000392dd2c0000 dataLength = 54 numChildren = 0 [zk: localhost:2181(CONNECTED) 2]?我們看到brokerid:0? ?---> backup01:9092
[zk: localhost:2181(CONNECTED) 3] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 4]broker/ids? ? 0? ? 1? ? ?2
topics有5個主題
[zk: localhost:2181(CONNECTED) 2] ls /controller [] [zk: localhost:2181(CONNECTED) 3] ls /brokers [ids, topics, seqid] [zk: localhost:2181(CONNECTED) 4] ls /brokers/ids [0, 1, 2] [zk: localhost:2181(CONNECTED) 5] ls /brokers/topics [secend, bigdata, three, first, __consumer_offsets] [zk: localhost:2181(CONNECTED) 6]?在新的版本中topics中多了一個__consumer_offsets這樣一個主題
查看到消費者的默認50個分區信息
這里我也不知道為啥在zk客戶端查看consumers(消費者組consumer-group)信息為空,,從上面的消息可以知道,我的生產者和消費者都是正常的,不知道新版本有什么特殊的改變,
查資料可知:Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,從 0.9 版本開始,consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic 為__consumer_offsets。?
1)修改配置文件 consumer.properties?
exclude.internal.topics=false cd /usr/local/hadoop/kafka/kafka_2.12-2.4.1/config vim consumer.properties所以我們知道了新版的kafka使用__consumer_offsets想要消費生成功?必須修改consumer.properties?這個配置文件才能正常記錄消費信息
__consumer_offsets這個也是個主題,正常情況下,kafka并不希望我們去獲取這個主題下的數據,我們這里是為了看里面的消費情況,所以我們就設置一下
2)讀取 offset??(我們使用的是0.11版本之后)
0.11.0.0 之前: bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties from-beginning 0.11.0.0 之后版本(含): bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning?
?這個很關鍵,指定配置文件,如果不指定,那就不生效
這里拋出一個問題,為什么要使用zookeeper?
假設我們要啟動一個消費者A去消費這個T1這個主題,我們連的是bootstrap-server,這個A?會把消息放到 topic 為__consumer_offsets這個主題下,那A是不是相當于__consumer_offsets這個主題的生產者,我們要消費的是__consumer_offsets,如果我們這里__consumer_offsets連的bootstrap-server,感覺這個__consumer_offsets又充當了生產者,感覺自己生產數據自己消費,有點怪怪的,所以我們這里使用了zookeeper,
但是在新的版本中
[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning Missing required argument "[bootstrap-server]"我們從consumer.properties配置文件也可以只是這里用的是bootstrap.servers?
Missing required argument "[bootstrap-server]"那我們不能用zookeeper了
[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server backup01:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginningkey value形式? ? ? [test-consumer-group,__consumer_offsets,3]的hash值我們不知道,但是程序自己是能找到那個分區(50個分區中)能確定唯一的offset
假如一個消費者組的的其中一個消費者掛了,其他消費者怎么能拿到offset,接著消費呢?
因為是同一個組的,掛了之后會通知其他消費者會帶著同一個組名來繼續消費
我們可以發現大概1秒鐘更新一次,這個提交是速度是可以改的
補充一點消費者組kafka的命令操作:
我們通過其它方式查看在kafka消費者組的信息? ,查看consumer group列表,使用--list參數
查看consumer group列表有新、舊兩種命令,分別查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要區分指定bootstrap--server和zookeeper參數:
注意:在新的版本中老版本的查詢方式已經移除,這里只提供新版本的查詢方式
bin/kafka-consumer-groups.sh new--consumer --bootstrap-server backup01:9092 --list使用kafka的bin目錄下面的kafka-consumer-groups.sh命令可以查看offset消費情況,注意,如果你的offset是存在kafka集群上的,就指定kafka服務器的地址bootstrap-server:
./bin/kafka-consumer-groups.sh --bootstrap-server backup01:9092 --describe --group console-consumer-63897 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-63897 bigdata 0 - 3 - consumer-console-consumer-63897-1-255b7119-e051-4ee1-ad64-d44581be1d20 /192.168.0.120 consumer-console-consumer-63897-1 console-consumer-63897 bigdata 1 - 2 - consumer-console-consumer-63897-1-255b7119-e051-4ee1-ad64-d44581be1d20 /192.168.0.120 consumer-console-consumer-63897-1 [root@backup01 kafka_2.12-2.4.1]# ./bin/kafka-consumer-groups.sh --bootstrap-server backup01:9092 --describe --group console-consumer-69882GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-69882 bigdata 0 - 3 - consumer-console-consumer-69882-1-7df49099-2e83-4871-8468-3279b3213edb /192.168.0.120 consumer-console-consumer-69882-1 console-consumer-69882 bigdata 1 - 2 - consumer-console-consumer-69882-1-7df49099-2e83-4871-8468-3279b3213edb /192.168.0.120 consumer-console-consumer-69882-1補充說明:
關于這個__consumer_offsets(后續再說,這里大概了解下)
拋出問題:
__consumer_offsets這個topic是由kafka自動創建的,默認50個,但是都存在一臺kafka服務器上,這是不是就存在很明顯的單點故障? 經測試,如果將存儲consumer_offsets的這臺機器kill掉,所有的消費者都停止消費了。請問這個問題是怎么解決的呢?
原因分析:
由于__consumer_offsets這個用于存儲offset的分區是由kafka服務器默認自動創建的,那么它在創建該分區的時候,分區數和副本數的依據是什么? 分區數是固定的50,這個沒什么可懷疑的,副本數呢?應該是一個默認值1,依據是,如果我們沒有在server.properties文件中指定topic分區的副本數的話,它的默認值就是1。
__consumer_offsets是一個非常重要的topic,我們怎么能允許它只有一個副本呢?這樣就存在單點故障,也就是如果該分區所在的集群宕機了的話, 我們的消費者就無法正常消費數據了。我在筆記本上搭建了kafka集群,共3個Broker,來解決這個問題。下面是一些記錄。
?說明:如果你的__consumer_offsets這個topic已經被創建了,而且只存在一臺broker上,如果你直接使用命令刪除這個topic是會報錯了,提示這是kafka內置的topic,禁止刪除。可以在zookeeper中刪除(我是使用ZooInspector這個客戶端連上zookeeper,刪除和__consumer_offsets這個topic有關的目錄或節點)。
總結
以上是生活随笔為你收集整理的大数据技术之kafka (第 3 章 Kafka 架构深入 ) offset讲解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PAT——程序运行时间 (1026)
- 下一篇: Java—正整数分解成质因数