原創(chuàng)作品,允許轉(zhuǎn)載,轉(zhuǎn)載時(shí)請(qǐng)務(wù)必以超鏈接形式標(biāo)明文章?原始出處?、作者信息和本聲明。否則將追究法律責(zé)任。http://tchuairen.blog.51cto.com/3848118/1855090
一、基礎(chǔ)理論
這塊是整個(gè)kafka的核心無(wú)論你是先操作在來(lái)看還是先看在操作都需要多看幾遍。
首先來(lái)了解一下Kafka所使用的基本術(shù)語(yǔ)
Topic
Kafka將消息種子(Feed)分門(mén)別類(lèi) 每一類(lèi)的消息稱(chēng)之為話(huà)題(Topic).
Producer
發(fā)布消息的對(duì)象稱(chēng)之為話(huà)題生產(chǎn)者(Kafka topic producer)
Consumer
訂閱消息并處理發(fā)布的消息的種子的對(duì)象稱(chēng)之為話(huà)題消費(fèi)者(consumers)
Broker
已發(fā)布的消息保存在一組服務(wù)器中稱(chēng)之為Kafka集群。集群中的每一個(gè)服務(wù)器都是一個(gè)代理(Broker). 消費(fèi)者可以訂閱一個(gè)或多個(gè)話(huà)題并從Broker拉數(shù)據(jù)從而消費(fèi)這些已發(fā)布的消息。
讓我們站的高一點(diǎn)從高的角度來(lái)看Kafka集群的業(yè)務(wù)處理就像這樣子
Client和Server之間的通訊是通過(guò)一條簡(jiǎn)單、高性能并且和開(kāi)發(fā)語(yǔ)言無(wú)關(guān)的TCP協(xié)議。除了Java Client外還有非常多的其它編程語(yǔ)言的Client。
話(huà)題和日志 ?(Topic和Log)
讓我們更深入的了解Kafka中的Topic。
Topic是發(fā)布的消息的類(lèi)別或者種子Feed名。對(duì)于每一個(gè)TopicKafka集群維護(hù)這一個(gè)分區(qū)的log就像下圖中的示例
每一個(gè)分區(qū)都是一個(gè)順序的、不可變的消息隊(duì)列 并且可以持續(xù)的添加。分區(qū)中的消息都被分配了一個(gè)序列號(hào)稱(chēng)之為偏移量(offset)在每個(gè)分區(qū)中此偏移量都是唯一的。 Kafka集群保持所有的消息直到它們過(guò)期 無(wú)論消息是否被消費(fèi)了。 實(shí)際上消費(fèi)者所持有的僅有的元數(shù)據(jù)就是這個(gè)偏移量也就是消費(fèi)者在這個(gè)log中的位置。 這個(gè)偏移量由消費(fèi)者控制正常情況當(dāng)消費(fèi)者消費(fèi)消息的時(shí)候偏移量也線性的的增加。但是實(shí)際偏移量由消費(fèi)者控制消費(fèi)者可以將偏移量重置為更老的一個(gè)偏移量重新讀取消息。 可以看到這種設(shè)計(jì)對(duì)消費(fèi)者來(lái)說(shuō)操作自如 一個(gè)消費(fèi)者的操作不會(huì)影響其它消費(fèi)者對(duì)此log的處理。 再說(shuō)說(shuō)分區(qū)。Kafka中采用分區(qū)的設(shè)計(jì)有幾個(gè)目的。一是可以處理更多的消息不受單臺(tái)服務(wù)器的限制。Topic擁有多個(gè)分區(qū)意味著它可以不受限的處理更多的數(shù)據(jù)。第二分區(qū)可以作為并行處理的單元。
分布式(Distribution)
Log的分區(qū)被分布到集群中的多個(gè)服務(wù)器上。每個(gè)服務(wù)器處理它分到的分區(qū)。 根據(jù)配置每個(gè)分區(qū)還可以復(fù)制到其它服務(wù)器作為備份容錯(cuò)。 每個(gè)分區(qū)有一個(gè)leader零或多個(gè)follower。Leader處理此分區(qū)的所有的讀寫(xiě)請(qǐng)求而follower被動(dòng)的復(fù)制數(shù)據(jù)。如果leader宕機(jī)其它的一個(gè)follower會(huì)被推舉為新的leader。 一臺(tái)服務(wù)器可能同時(shí)是一個(gè)分區(qū)的leader另一個(gè)分區(qū)的follower。 這樣可以平衡負(fù)載避免所有的請(qǐng)求都只讓一臺(tái)或者某幾臺(tái)服務(wù)器處理。
生產(chǎn)者(Producers)
生產(chǎn)者往某個(gè)Topic上發(fā)布消息。生產(chǎn)者也負(fù)責(zé)選擇發(fā)布到Topic上的哪一個(gè)分區(qū)。最簡(jiǎn)單的方式從分區(qū)列表中輪流選擇。也可以根據(jù)某種算法依照權(quán)重選擇分區(qū)。開(kāi)發(fā)者負(fù)責(zé)如何選擇分區(qū)的算法。
消費(fèi)者(Consumers)
通常來(lái)講消息模型可以分為兩種 隊(duì)列和發(fā)布-訂閱式。 隊(duì)列的處理方式是 一組消費(fèi)者從服務(wù)器讀取消息一條消息只有其中的一個(gè)消費(fèi)者來(lái)處理。在發(fā)布-訂閱模型中消息被廣播給所有的消費(fèi)者接收到消息的消費(fèi)者都可以處理此消息。Kafka為這兩種模型提供了單一的消費(fèi)者抽象模型 消費(fèi)者組 consumer group。 消費(fèi)者用一個(gè)消費(fèi)者組名標(biāo)記自己。 一個(gè)發(fā)布在Topic上消息被分發(fā)給此消費(fèi)者組中的一個(gè)消費(fèi)者。 假如所有的消費(fèi)者都在一個(gè)組中那么這就變成了queue模型。 假如所有的消費(fèi)者都在不同的組中那么就完全變成了發(fā)布-訂閱模型。 更通用的 我們可以創(chuàng)建一些消費(fèi)者組作為邏輯上的訂閱者。每個(gè)組包含數(shù)目不等的消費(fèi)者 一個(gè)組內(nèi)多個(gè)消費(fèi)者可以用來(lái)擴(kuò)展性能和容錯(cuò)。正如下圖所示
??2個(gè)kafka集群托管4個(gè)分區(qū)P0-P32個(gè)消費(fèi)者組消費(fèi)組A有2個(gè)消費(fèi)者實(shí)例消費(fèi)組B有4個(gè)。
正像傳統(tǒng)的消息系統(tǒng)一樣Kafka保證消息的順序不變。 再詳細(xì)扯幾句。傳統(tǒng)的隊(duì)列模型保持消息并且保證它們的先后順序不變。但是 盡管服務(wù)器保證了消息的順序消息還是異步的發(fā)送給各個(gè)消費(fèi)者消費(fèi)者收到消息的先后順序不能保證了。這也意味著并行消費(fèi)將不能保證消息的先后順序。用過(guò)傳統(tǒng)的消息系統(tǒng)的同學(xué)肯定清楚消息的順序處理很讓人頭痛。如果只讓一個(gè)消費(fèi)者處理消息又違背了并行處理的初衷。 在這一點(diǎn)上Kafka做的更好盡管并沒(méi)有完全解決上述問(wèn)題。 Kafka采用了一種分而治之的策略分區(qū)。 因?yàn)門(mén)opic分區(qū)中消息只能由消費(fèi)者組中的唯一一個(gè)消費(fèi)者處理所以消息肯定是按照先后順序進(jìn)行處理的。但是它也僅僅是保證Topic的一個(gè)分區(qū)順序處理不能保證跨分區(qū)的消息先后處理順序。 所以如果你想要順序的處理Topic的所有消息那就只提供一個(gè)分區(qū)。
Kafka的保證(Guarantees)
生產(chǎn)者發(fā)送到一個(gè)特定的Topic的分區(qū)上的消息將會(huì)按照它們發(fā)送的順序依次加入
消費(fèi)者收到的消息也是此順序
如果一個(gè)Topic配置了復(fù)制因子( replication facto)為N 那么可以允許N-1服務(wù)器宕機(jī)而不丟失任何已經(jīng)增加的消息
Kafka官網(wǎng)
http://kafka.apache.org/
作者半獸人
鏈接http://orchome.com/5
來(lái)源OrcHome
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán)非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
二、安裝和啟動(dòng)
1、下載二進(jìn)制安裝包直接解壓
| 1 2 | tar?xf?kafka_2.11-0.10.0.1.tgz cd?kafka_2.11-0.10.0.1 |
2、啟動(dòng)服務(wù)
Kafka需要用到ZooKeepr所以需要先啟動(dòng)一個(gè)ZooKeepr服務(wù)端如果沒(méi)有單獨(dú)的ZooKeeper服務(wù)端可以使用Kafka自帶的腳本快速啟動(dòng)一個(gè)單節(jié)點(diǎn)ZooKeepr實(shí)例
| 1 2 3 | bin/zookeeper-server-start.sh?config/zookeeper.properties??#?啟動(dòng)zookeeper服務(wù)端實(shí)例 bin/kafka-server-start.sh?config/server.properties??#?啟動(dòng)kafka服務(wù)端實(shí)例 |
三、基本操作指令
1、新建一個(gè)主題topic
創(chuàng)建一個(gè)名為“test”的Topic只有一個(gè)分區(qū)和一個(gè)備份
| 1 | bin/kafka-topics.sh?--create?--zookeeper?localhost:2181?--replication-factor?1?--partitions?1?--topic?test |
2、創(chuàng)建好之后可以通過(guò)運(yùn)行以下命令查看已創(chuàng)建的topic信息
| 1 | bin/kafka-topics.sh?--list??--zookeeper?localhost:2181 |
3、發(fā)送消息
Kafka提供了一個(gè)命令行的工具可以從輸入文件或者命令行中讀取消息并發(fā)送給Kafka集群。每一行是一條消息。
運(yùn)行producer生產(chǎn)者,然后在控制臺(tái)輸入幾條消息到服務(wù)器。
| 1 2 3 | bin/kafka-console-producer.sh?--broker-list?localhost:9092?--topic?test? This?is?a?message This?is?another?message |
4、消費(fèi)消息
Kafka也提供了一個(gè)消費(fèi)消息的命令行工具,將存儲(chǔ)的信息輸出出來(lái)。
| 1 2 3 | bin/kafka-console-consumer.sh?--zookeeper?localhost:2181?--topic?test?--from-beginning This?is?a?message This?is?another?message |
5、查看topic詳細(xì)情況
| 1 | bin/kafka-topics.sh?--describe?--zookeeper?localhost:2181??--topic?peiyinlog |
Topic: 主題名稱(chēng)
Partition: 分片編號(hào)
Leader: 該分區(qū)的leader節(jié)點(diǎn)
Replicas: 該副本存在于哪個(gè)broker節(jié)點(diǎn)
Isr: 活躍狀態(tài)的broker
6、給Topic添加分區(qū)
| 1 | bin/kafka-topics.sh?--zookeeper?192.168.90.201:2181?--alter?--topic?test2?--partitions?20 |
7、刪除Topic
| 1 | bin/kafka-topics.sh?--zookeeper?zk_host:port/chroot?--delete?--topic?my_topic_name |
主題(Topic)刪除選項(xiàng)默認(rèn)是關(guān)閉的,需要服務(wù)器配置開(kāi)啟它。
| 1 | delete.topic.enable=true |
注:如果需要在其他節(jié)點(diǎn)作為客戶(hù)端使用指令連接kafka broker,則需要注意以下兩點(diǎn)(二選一即可)
另 : ( 使用logstash input 連接kafka也需要注意 )
1、設(shè)置kafka broker 配置文件中 host.name 參數(shù)為監(jiān)聽(tīng)的IP地址
2、給broker設(shè)置一個(gè)唯一的主機(jī)名,然后在本機(jī)/etc/hosts文件配置解析到自己的IP(當(dāng)然如果有內(nèi)網(wǎng)的DNS服務(wù)器也行),同時(shí)還需要在zk server 和 客戶(hù)端的 /etc/hosts 添加broker主機(jī)名的解析。?
原因詳解:
場(chǎng)景假設(shè)
| broker_server ip | 主機(jī)名 | zookeeper ip | 客戶(hù)端 ip |
| 192.168.1.2? | 默認(rèn) localhost | 192.168.1.4 | 192.168.1.5 |
| 1 2 3 | #?此時(shí)客戶(hù)端向broker發(fā)起一些消費(fèi): bin/kafka-console-consumer.sh?--zookeeper?192.168.1.4:2181?--topic?test2?--from-beginning |
這時(shí)客戶(hù)端連接到zookeeper要求消費(fèi)數(shù)據(jù),zk則返回broker的ip地址和端口給客戶(hù)端,但是如果broker沒(méi)有設(shè)置host.name 和 advertised.host.name ?broker默認(rèn)返回的是自己的主機(jī)名,默認(rèn)就是localhost和端口9092,這時(shí)客戶(hù)端拿到這個(gè)主機(jī)名解析到自己,操作失敗。
所以,需要配置broker 的host.name參數(shù)為監(jiān)聽(tīng)的IP,這時(shí)broker就會(huì)返回IP。 客戶(hù)端就能正常連接了。
或者也可以設(shè)置好broker的主機(jī)名,然后分別給雙方配置好解析。
四、broker基本配置
| 1 2 3 4 5 6 7 8 | #??server.properties broker.id=0??#?broker節(jié)點(diǎn)的唯一標(biāo)識(shí)?ID?不能重復(fù)。 host.name=10.10.4.1??#?監(jiān)聽(tīng)的地址,如果不設(shè)置默認(rèn)返回主機(jī)名給zk_server log.dirs=/u01/kafka/kafka_2.11-0.10.0.1/data??#?消息數(shù)據(jù)存放路徑 num.partitions=6??#?默認(rèn)主題(Topic)分片數(shù) log.retention.hours=24??#?消息數(shù)據(jù)的最大保留時(shí)長(zhǎng) zookeeper.connect=10.160.4.225:2181??#?zookeeper?server?連接地址和端口 |
五、Logstash + Kafka 實(shí)戰(zhàn)應(yīng)用
Logstash-1.51才開(kāi)始內(nèi)置Kafka插件,也就是說(shuō)用之前的logstash版本是需要手動(dòng)編譯Kafka插件的,相信也很少人用了。建議使用2.3以上的logstash版本。
1、使用logstash向kafka寫(xiě)入一些數(shù)據(jù)
軟件版本:
logstash 2.3.2?
kafka_2.11-0.10.0.1
logstash output 部分配置
| 1 2 3 4 5 6 7 8 9 | output?{ ??kafka?{ ????workers?=>?2 ????bootstrap_servers?=>?"10.160.4.25:9092,10.160.4.26:9092,10.160.4.27:9092" ????topic_id?=>?"xuexilog" } } |
參數(shù)解釋 :?
workers:用于寫(xiě)入時(shí)的工作線程
bootstrap_servers:指定可用的kafka broker實(shí)例列表
topic_id:指定topic名稱(chēng),可以在寫(xiě)入前手動(dòng)在broker創(chuàng)建定義好分片數(shù)和副本數(shù),也可以不提前創(chuàng)建,那么在logstash寫(xiě)入時(shí)會(huì)自動(dòng)創(chuàng)建topic,分片數(shù)和副本數(shù)則默認(rèn)為broker配置文件中設(shè)置的。
2、使用logstash消費(fèi)一些數(shù)據(jù),并寫(xiě)入到elasticsearch
軟件版本:
logstash 2.3.2?
elasticsearch-2.3.4
logstash 配置文件
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | input{ ????kafka?{ ????????zk_connect?=>?"112.100.6.1:2181,112.100.6.2:2181,112.100.6.3:2181" ????????group_id?=>?"logstash" ????????topic_id?=>?"xuexilog" ????????reset_beginning?=>?false ????????consumer_threads?=>?5 ????????decorate_events?=>?true } } #?這里group_id?需要解釋一下,在Kafka中,相同group的Consumer可以同時(shí)消費(fèi)一個(gè)topic,不同group的Consumer工作則互不干擾。 #?補(bǔ)充:?在同一個(gè)topic中的同一個(gè)partition同時(shí)只能由一個(gè)Consumer消費(fèi),當(dāng)同一個(gè)topic同時(shí)需要有多個(gè)Consumer消費(fèi)時(shí),則可以創(chuàng)建更多的partition。 output?{ ????if?[type]?==?"nginxacclog"?{ ????????elasticsearch?{ ????????????hosts?=>?["10.10.1.90:9200"] ????????????index?=>?"logstash-nginxacclog-%{+YYYY.MM.dd}" ????????????manage_template?=>?true ????????????flush_size?=>?50000 ????????????idle_flush_time?=>?10 ????????????workers?=>?2 } } } |
3、通過(guò)group_id 查看當(dāng)前詳細(xì)的消費(fèi)情況
| 1 | bin/kafka-consumer-groups.sh?--group?logstash?--describe?--zookeeper?127.0.0.1:2181 |
輸出解釋:
| GROUP | TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG |
| 消費(fèi)者組 | 話(huà)題id | 分區(qū)id | 當(dāng)前已消費(fèi)的條數(shù) | 總條數(shù) | 未消費(fèi)的條數(shù) |
本文出自 “突破舒適區(qū)” 博客,請(qǐng)務(wù)必保留此出處http://tchuairen.blog.51cto.com/3848118/1855090
來(lái)源:http://tchuairen.blog.51cto.com/3848118/1855090
總結(jié)
以上是生活随笔為你收集整理的Kafka 入门 and kafka+logstash 实战应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。