Kafka深入浅出(一)
最近閑來無事,搜集一些消息架構方面的資料進行學習,偶然在一個站點發現Kafka還是不錯的,故想按著自己研究、學習的經歷,寫下一篇日記,以便后續在工作中可以 Ctrl + C & Ctrl + V , 嘿嘿
?
Kafak是誰?
Kafka源自LinkIn,主要支撐分布式日志服務,主要開發語言是Scala+少量的Java,設計目標是一種基于集群可處理流式數據的消息分布式系統,后開源交付給Apache基金會進行維護,目前是該基金會的頂級項目之一,當然還有一款大名鼎鼎消息系統ActiveMQ目前也是Apache基金會進行維護,后續在學習中會逐漸對兩款消息機制進行對比
?
Apache對Kafka的定位
1. Publish & Subscribe (發布&訂閱)
Kafka是一種基于發布&訂閱(也有稱為生產/消費模型)架構的一種消息機制,目前需要大吞吐量的架構大多基于消息機制。
2. Process (數據處理)
Kafka可高效的處理流式數據,并且可以進行實時處理
3. Store (存儲)
Kafka的消息數據存儲是基于自身一種叫 Isr 的東西來進行安全可靠的分布式存儲
快速開始使用Kafka
1. 下載Kafka服務器端
目前Kafka最新版本是0.10.1.0,但坊間流傳0.8.x系列才是最為流行的,原因是當時由于storm的流行,造就了目前的Kafka(當然你需要Linux系統來玩Kafka,我用的是Oracle VM VirtualBox)
http://kafka.apache.org/downloads
有多個版本可以(自行)進行選擇
wget http://mirror.bit.edu.cn/apache/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz如果你這里沒有wget工具的話,可以通過yum進行安裝
yum install wget解壓
tar -zxvf kafka_2.10-0.10.1.0.tgz將Kafka移動到本地用戶目錄下面
mv kafka_2.10-0.10.1.0 /usr/local/看一下Kafka里面有什么東西
[root@localhost kafka_2.10-0.10.1.0]# ls bin config libs LICENSE NOTICE site-docs通過長期對Apache的東西使用,看名字也應該知道目錄大概的用途(其實懂計算機的都應該知道)
bin:提供一些寫好的腳本文件,用于管理、測試服務器
config:肯定是一堆相關的配置文件
libs:一對libs(依賴)
LICENSE:許可文件說明(開源不等于隨便免費使用,各位開發者請注意一下版權)
NOTICE:沒什么用的一些內容
site-docs:里面有一個?kafka_2.10-0.10.1.0-site-docs.tgz 壓縮包,應該是使用手冊的本地版本(API說明之類的)
2. 啟動服務器(你沒看錯、就是這么簡單..)
當然Kafka是依賴于zookeeper的(當年面試只知道是動物園管理員 :D),所以需要先啟動zookeeper,由于kafka高度依賴zookeeper(目前基本算一個分布式JDNI實現),所以kafka壓縮包內自帶了一個簡易版的zokeeper,使用下便命令啟動(前提條件是服務器需要jvm支持,可以安裝JDK解決)
bin/zookeeper-server-start.sh -daemon config/zookeeper.propertieszookeeper-server-start.sh 腳本的參數如下:
[root@localhost kafka_2.10-0.10.1.0]# bin/zookeeper-server-start.sh USAGE: bin/zookeeper-server-start.sh [-daemon] zookeeper.properties-daemon:后臺值守模式?(靜默模式)反正就是后臺啟動,需要看到輸出的同學可以不加這個參數
zookeeper.properties:properties當然是配置文件,通常會放在config目錄下面
驗證一下是否啟動成功(lsof 也可以直接 yum install lsof)
[root@localhost kafka_2.10-0.10.1.0]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2236 root 83u IPv6 19048 0t0 TCP *:eforward (LISTEN)如果2181端口處于 LISTEN 狀態的話,基本說明zookeeper已經啟動完畢了,接下來就可以啟動kafka服務器了,腳本看名字就知道是這個,看一下參數,大體和zookeeper一樣
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-server-start.sh USAGE: bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*啟動之
bin/kafka-server-start.sh -daemon config/server.properties查看一下啟動情況,(kafka默認服務端口是9092,可以通過看server.properties知道-當然也可以修改)
[root@localhost kafka_2.10-0.10.1.0]# lsof -i:9092 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2468 root 97u IPv6 19365 0t0 TCP *:XmlIpcRegSvc (LISTEN) java 2468 root 101u IPv6 19367 0t0 TCP localhost:47108->localhost:XmlIpcRegSvc (ESTABLISHED) java 2468 root 102u IPv6 19368 0t0 TCP localhost:XmlIpcRegSvc->localhost:47108 (ESTABLISHED) [root@localhost kafka_2.10-0.10.1.0]# lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 2236 root 83u IPv6 19048 0t0 TCP *:eforward (LISTEN) java 2236 root 84u IPv6 19360 0t0 TCP localhost:eforward->localhost:59873 (ESTABLISHED) java 2468 root 82u IPv6 19357 0t0 TCP localhost:59873->localhost:eforward (ESTABLISHED)如果9092處于LISTEN,說明kafka(單機版)已經可以工作了
3. 創建一個 topic
topic 是一個話題,如果從rpc架構理解有點像接口名,通過kafka-topics.sh這個腳本就可以創建topic了,直接運行可以看到幫助
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment A list of manual partition-to-broker <broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.咱們使用如下命令創建一個測試的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test創建的時候需要指定 zookeeper 集群的地址,--topice是指定名字test
如果沒錯會顯示?Created topic "test". 說明創建成功
可以通過如下命令驗證
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0可以看到test topic已經創建成功
4. 通過Kafka發送消息
kafka提供命令行輸入腳本來測試服務器是否工作正常,可以使用如下命令進行發送(同樣方式查看幫助)
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test kafka is cool--broker-list:指定kafka服務器的信息
5. 接收信息
可以啟動一個consumer(消費者,消息接收、處理者)來接收剛才producer(生產者,消息發送者)的消息
[root@localhost kafka_2.10-0.10.1.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning kafka is cool--bootstrap-server:指定kafka服務器配置
--from-beginning:說明接收所有消息(從開始起)
終于可以開心的玩耍了
?
為什么要用消息中間件?
1. 解耦
其實很多架構及編程思想都是解決解耦的問題的,其實這個思想我自認為應該來自與工業制造,假設一臺汽車如果沒有合理的對發動機、地盤等結構進行解耦,估計是沒有辦法進行獨立生產,然后進行組裝的,把這種思想用于軟件開發、架構設計里其實是一樣的,目的就是讓各個零部件(組件)可以單獨開發(生產),然后進行拼轉,當然如果符合某種制式(標準)的組件是可以隨意更換的(其實就是接口的用途)
2. 冗余
電子電路的發明,以及替代了很多機械結構的好處之一就是電子電路很容易做到冗余,傳說F22戰斗機一般的電子系統都是四路冗余,被擊干壞了一部分,并不影響整體的性能(服務),當然消息系統通過制定標準的接口,也可以簡單的實現冗余(所有的分布式系統都可以實現冗余好吧)
3. 擴展
說到擴展一般要求是水平擴展能力(加十臺服務器即可獲得十臺服務器的性能),并非垂直擴展(加十臺服務器可能只獲得兩臺服務器的能力提升),當然這種水平擴展能力一般都需要無狀態的組件設計(通常理解沒有SESSION,或者自己封裝SESSION),如果沒個幾百上千萬的用戶,還是不要勞民傷財了
4. 峰值處理能力
說道峰值處理能力(消峰),也是消息中間件在部分架構中優于RPC(遠程方法調用)的原因之一,因為消息中間件可以用一個列隊(內存Queue)之類的東西,把所有的業務請求緩存其中,等待業務服務器在列隊中自助取用,這個列隊(內存)就好比一個請求的緩沖墊,當然只要良好的監控這個內存緩沖墊(不要溢出),并且有良好的機制防止這塊內存crash(崩潰),這種方法還是挺安全的,當然還是有一個另外的缺點,就是由于有這個內存的列隊,很多請求處理都是異步的,是的如果你用過AJAX,你就會明白所有的業務請求都需要一個回調,同時還要處理這個回調,但JS是在瀏覽器運行與用戶及時交互,但后臺架構如果是異步+回調,同時還需要把響應反饋給前端(oh omg),還是不要勞民傷財了
5. 異步通訊
由于有這個內存,當然就可以異步通訊,如果業務沒必要需要處理這么大的數據,還是不要勞民傷財了????
?
常用的消息中間件有哪些?
1. RabbitMQ
Erlang語言,支持AMQP、XMPP、SMTP、STOMP,支持負載均衡、數據持久,支持P2P和發布/訂閱
2. Redis
基于鍵值對的NoSql數據庫(Map集合能干的事太多了,當然Map就是鍵值對),同時支持列隊服務(據說是輕量級),消息小于10k時性能比RabbitMQ好,大于的時候會比RabbitMQ差(據傳說)
3. ZeroMQ
沒有服務器中間件,應用程序集成需要通過庫(SDK)來集成服務器功能,很明顯開發工作量亞歷山大
4. ActiveMQ
Apache的又一產品線,據說是最為流行的中間件(為什么沒研究它呢?見后面),實現了JMS of J2EE,支持持久化、P2P,傳說支持XA事務(二段事務),這個有點牛,就是不知道性能怎么樣
5. Kafka
Apache維護的產品線(頂級項目),主要是為高性能而生,據說可以做到O1的性能,宣傳是下一代消息中間件(我們要研究下一代,嘿嘿),支持數據持久化、同時支持數據在線、離線處理,不支持XA事務(由于側重點不同)
?
Kafka架構簡介
1. Producer 生產者
一般理解生產數據的一方,有的時候也可以理解為發起請求需要服務的一方,Kafka中Producer直接訪問Kafka集群,并且是數據通過push(推送)的方式,壓入到Kafka的列隊中,這里采用push的方式可以確保數據及時到達列隊中
2. Broker 代理
topic是數據保存的名字(文件名),每一個topic對應一個Broker(Broker負責保存Topic的內容),一個Broker由多個Partition組成,每個Partition均勻分布在kafka集群之中(Partition分布于多臺服務器,致使Kafka具備冗余)
3. Consumer 消費者
一般消費者指的是提供服務的一方,記錄日志、處理業務邏輯之類的,Kafka的Consumer設計通過pull(拉)的方式,到Kafka集群中拉數據,這樣的設計可以降低Consumer的壓力(吃多少拿多少,當然降低壓力)
4. Kafka 與 Zookeeper
Kafka 經歷數次改版,老的版本消費者并非從Kafka集群,而是zookeeper集群中直接拿數據,新版本已經修復這個設計問題
?
結后語
通過對kafka的簡單了解,后面就準備開始開發Producer和Consumer拉,動手寫代碼的感覺總是很好 :D
同時感謝 郭俊 先生的指導,他的博客?http://www.jasongj.com/
?
參考資料
1.?http://kafka.apache.org/
轉載于:https://my.oschina.net/u/2279119/blog/774700
總結
以上是生活随笔為你收集整理的Kafka深入浅出(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Docker-核心篇(1)-CentOS
- 下一篇: 讯时新闻系统漏洞