kakfa从入门到放弃(四): 分区和副本机制、高级与低级API、 kafka-eagle、原理、数据清理、限速
文章目錄
- 一、分區和副本機制:
- 1. 生產者分區寫入策略:
- 1.1 輪詢分配策略:
- 1.2 隨機策略(不用):
- 1.3 按key分配策略:
- 1.4 亂序問題:
- 1.5 自定義分區策略:
- 2. 消費者組Rebalance機制:
- 2.1 Rebalance 再平衡:
- 2.2 rebalance的不良影響:
- 3. 消費者分區消費策略:
- 3.1 Range范圍分配策略:
- 3.2 RoundRobin輪詢策略:
- 3.3 Stricky粘性分配策略:
- 4. 副本機制:
- 4.1 producer的ACKs參數
- 4.2 ack配置為0
- 4.3 ack配置為1
- 4.4 ack配置為-1或all
- 二、高級API和低級API
- 1. 高級API
- 1.1 優點:
- 1.2 缺點:
- 2. 低級API
- 三、監控工具: kafka-eagle
- 1. kafka-eagle介紹:
- 2. 安裝Kafka-Eagle
- 2.1 開啟Kafka JMX端口
- 2.2 安裝Kafka-Eagle
- 3 Kafka度量指標
- 3.1 topic list
- 四、kafka原理
- 1. 副本:
- 1.1 副本的Leader與Follower:
- 1.2 AR、ISR、OSR
- 1.3 Leader選舉
- 1.3.1 Controller介紹
- 1.3.2 controller的選舉:
- 1.3.3 找到當前Kafka集群的controller
- 1.3.4 controller選舉partition leader
- 1.4 leader的負載均衡
- 2. kafka生產、消費數據工作流程
- 2.1 kafka數據寫入的流程
- 2.2 kafka數據消費流程
- 2.2.1 兩種消費模式
- 2.2.2 Kafka消費數據流程
- 3. kafka的數據存儲形式:
- 3.1 存儲日志
- 3.2 讀取消息
- 3.3 刪除消息
- 4. 消息不丟失機制:
- 4.1 broker數據不丟失:
- 4.2 生產者數據不丟失:
- 4.3 消費者數據不丟失:
- 4.3.1 一般消費丟失情況:
- 4.3.2 重復消費:
- 5. 數據積壓:
- 5.1 使用工具查看積壓情況
- 5.1.1 kafka Tools
- 5.1.2 kafka-Eagle:
- 5.2 解決數據積壓問題:
- 五、kafka中數據清理(Log Deletion):
- 1. 日志刪除
- 1.1 定時日志刪除任務
- 1.2基于時間的保留策略
- 1.3 刪除日志分段:
- 1.3.1 設置 topic 5秒刪除一次:
- 1.4 基于日志大小的保留策略
- 1.5 基于日志起始偏移量保留策略
- 2. 日志壓縮(Log Compaction)
- 六、Kafka配額限速機制(Quotas)
- 1. 限制producer端速率
- 2. 限制consumer端速率
- 3. 取消Kafka的Quota配置
一、分區和副本機制:
1. 生產者分區寫入策略:
生產者寫入消息到topic, kafak將依據不同的策略將數據分配到不同的分區中:
- 輪詢分區策略;
- 隨機分區策略;
- 按key分區分配策略;
- 自定義分區策略;
1.1 輪詢分配策略:
- 默認的策略, 也是使用最多的策略, 可以最大限度的保證所有消息平均分配到一個分區;
- 如果在生產消息時, key為null, 則使用輪詢算法均衡的分配分區;
1.2 隨機策略(不用):
隨機策略, 每次都隨機地將消息分配到每個分區; 在較早的版本, 默認的分區策略師隨機策略; 也是為了將消息均衡的寫入每個分區; 但后續輪詢策略表現更佳, 所以基本上很少使用隨機策略;
1.3 按key分配策略:
按key分配策略, 有可能出現 數據傾斜; 例如: 某個key包含大量的數據, 因為key值一樣, 所以, 所有的數據都將分配到一個分區中, 造成改分區的消息數量遠大于其他分區;
1.4 亂序問題:
輪詢策略, 隨機策略都會產生一個問題, 生產到kafka中的數據是亂序存儲的, 而按key分區可以在一定程度上實現數據有序存儲–也就是局部有序, 但這又可能導致數據傾斜, 所以在實際生產環境中要結合實際情況來取舍; 即: kafka中的消息是全局亂序, 局部partition是有序的; 如果要實現消息總是有序的, 可以將連續的消息放到一個partition, 但kafka失去了分布式的意義;
1.5 自定義分區策略:
實現步驟:
- 創建自定義分區器;
- 在kafka生產者中, 自定的使用自定義分區器;
2. 消費者組Rebalance機制:
2.1 Rebalance 再平衡:
kafka中rebalance稱之為"再平衡’, 是kafka中確保Consumer Group下所有的consumer如何達成一致, 分配訂閱的topic的每個分區的機制;
Rebalance觸發時機有:
- 消費者組中consumer的個數發生變化; 例如: 有新的消費者加入, 或者是某個consumer停止;
- 訂閱的topic個數發生變化:
消費者可以訂閱多個主題, 假設當前的消費者組訂閱了三個topic, 但有一個topic突然被刪除了, 此時需要再平衡;
- 訂閱的topic分區數發生變化
2.2 rebalance的不良影響:
- 發生rebalance時, consumer group下的所有consumer都會協調在一起共同參與, kafka使用分配策略盡可能達到公平分配;
- rebalance過程會對consumer group產生非常嚴重的影響, rebalance的過程中所有的consumer都將停止工作, 直到rebalance完成;
3. 消費者分區消費策略:
3.1 Range范圍分配策略:
Range范圍分配策略是kafka默認的分配策略, 它可以確保每個消費者消費的分區數量是均衡的;
注意: Range范圍分配策略是針對每個topic的
配置
配置消費者的partition.assignment.strategy為RangeAssignor
算法公式
n = 分區數量 / 消費者數量
m = 分區數量 % 消費者數量
前m個消費者消費n+1, 剩余消費者消費n個
3.2 RoundRobin輪詢策略:
RoundRobinAssignor輪詢策略是將消費者組所有消費者以及消費者所訂閱的所有的topic的partition按照字典順序排序(topic和partition的hashcode進行排序), 然后通過輪詢方式逐個將分區一次分配給每個消費者;
配置
配置消費者的partition.assignment.strategy為RoundRobinAssignor
3.3 Stricky粘性分配策略:
從kafak 0.11.x開始, 引入此類分配策略; 主要目的:
- 分區分配盡可能均勻;
- 在發生rebalance的時候, 分區的分配盡可能與上一次分配保持相同;
- 沒有發生rebalance時, stricky粘性分配策略和RoundRobin分配策略類似;
如果consumer2崩潰了, 此時需要進行rebalance; 如果是Range分配和輪詢分配都會重新進行分配;
此時, consumer0和consumer1原來消費的分區大多發生了改變;
粘性分配方式:
粘性分配策略, 保留rebalance之前的分配結果, 只是將原先的consumer2負責的兩個分區再均勻分配給consumer0、consumer1; 這樣可以明顯減少系統資源的浪費, 例如: 之前consumer0、consumer1之前正在消費某幾個分區, 但由于rebalance發生, 導致consumer0和consumer1需要重新消費之前正在處理的分區, 導致不必要的系統開銷;
4. 副本機制:
副本的目的就是冗余備份, 當某個Broker上的分區數據丟失時, 依然可以保障數據可用; 因為在其他的broker上的副本是可用的;
4.1 producer的ACKs參數
對副本關系較大的就是, producer配置的acks參數;
acks參數表示當生產者生產消息的時候, 寫入到副本的要求嚴格程度; 它決定了生產者如何在性能和可靠性之間的取舍:
配置
conf := sarama.NewConfig() conf.Producer.RequiredAcks = sarama.WaitForAll const (// NoResponse doesn't send any response, the TCP ACK is all you get.NoResponse RequiredAcks = 0// WaitForLocal waits for only the local commit to succeed before responding.WaitForLocal RequiredAcks = 1// WaitForAll waits for all in-sync replicas to commit before responding.// The minimum number of in-sync replicas is configured on the broker via// the `min.insync.replicas` configuration key.WaitForAll RequiredAcks = -1 )4.2 ack配置為0
4.3 ack配置為1
當生產者的ACK配置為1時, 生產者會等待leader副本確認接收后, 才會發送下一條數據, 性能中等;
4.4 ack配置為-1或all
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=all| 吞吐量 | 16.5W/s | 9.3W/s | 7.3W/s |
| 吞吐速率 | 158.19 MB/sec | 88.78 MB/sec | 70.18 MB/sec |
| 平均延遲時間 | 192.43 ms | 346.62 ms | 438.77 ms |
| 最大延遲時間 | 670.00 ms | 1003.00 ms | 1884.00 ms |
二、高級API和低級API
1. 高級API
1.1 優點:
- 不需要執行管理offset, 直接通過zk管理, 也不需要管理分區、副本, 由kafka統一管理;
- 消費者會自動根據上一次在zk中保存的offset去接著獲取數據;
- 在zk中, 不同的消費者組, 同一個topic記錄不同的offset, 這樣不同程序讀取同一個topic, 不會受到offset的影響;
1.2 缺點:
- 不能控制offset, 例如: 從指定的位置讀取數據;
- 不能細化控制分區、副本、zk等;
2. 低級API
通過使用低級API, 可以自己來控制offset, 想從哪讀, 就從哪讀; 而且, 可以自己控制連接分區, 對分區自定義負載均衡;
之前offset是自動保存在ZK中, 使用低級API, 可以將offset不一定使用zk存儲, 可以自己來存儲offset, 例如: 存儲文件、Mysql、內存等;
但是低級API比較復雜, 需要執行控制offset連接到那個分區, 并找到分區的leader;
三、監控工具: kafka-eagle
1. kafka-eagle介紹:
在開發工作中, 當業務前提不復雜時, 可以使用Kafka命令來進行一些集群的管理工作; 但如果業務變得復雜, 例如: 需要增加group、topic分區, 此時, 再使用命令行就感覺很不方便, 此時, 如果使用一個可視化的工具幫助完成日常的管理工作, 將會大大提高對于Kafka集群管理的效率,而且使用工具來監控消費者在Kafka中消費情況;
早期, 要監控Kafka集群我們可以使用Kafka Monitor以及Kafka Manager, 但隨著我們對監控的功能要求、性能要求的提高, 這些工具已經無法滿足;
Kafka Eagle是一款結合了目前大數據Kafka監控工具的特點, 重新研發的一塊開源免費的Kafka集群優秀的監控工具; 它可以非常方便的監控生產環境中的offset、lag變化、partition分布、owner等;
官網地址: https://www.kafka-eagle.org/
2. 安裝Kafka-Eagle
2.1 開啟Kafka JMX端口
JMX接口
JMX(Java Management Extensions)是一個為應用程序植入管理功能的框架;
JMX是一套標準的代理和服務, 實際上, 用戶可以在任何Java應用程序中使用這些代理和服務實現管理; 很多的一些軟件都提供了JMX接口, 來實現一些管理、監控功能;
在啟動Kafka的腳本前, 添加:
cd ${KAFKA_HOME} export JMX_PORT=9988 nohup bin/kafka-server-start.sh config/server.properties &PS: 如果是docker啟動, 需要添加 -e JMX_PORT=9988
2.2 安裝Kafka-Eagle
- 安裝JDK, 并配置好JAVA_HOME;
- 將kafka_eagle上傳, 并解壓到 /export/server 目錄中;
- 配置 kafka_eagle: 使用vi打開conf目錄下的system-config.properties
- 配置 kafka_eagle 環境變量;
- 配置JAVA_HOME
操作系統配置JAVA_HOME的環境變量
PS:
- 配置kafka_eagle和JAVA_HOME均可在bin/ke.sh腳本中完成
- 修改Kafka eagle可執行權限
- 啟動 kafka_eagle:
- 訪問Kafka eagle, 默認用戶為admin, 密碼為: 123456
http://localhost:8048
3 Kafka度量指標
3.1 topic list
點擊Topic下的List菜單, 就可以展示當前Kafka集群中的所有topic;
| Brokers Spread | broker使用率 |
| Brokers Skew | 分區是否傾斜 |
| Brokers Leader Skew | leader partition是否存在傾斜 |
四、kafka原理
1. 副本:
1.1 副本的Leader與Follower:
在kafka中, 每個topic都可以配置多個分區以及多個副本; 每個分區都有一個leader副本以及0個及以上個follower副本, 在創建topic時, kafak會將每個分區的leader副本均勻的分配在每個borker上;
正常使用kafka時, 是感覺不到leader、follower存在的, 但其實, 所有的讀寫操作都是由leader處理的, 而所有的follower都復制leader的日志數據文件;
如果leader出現故障, follower就會被選舉為新leader;
所以:
- kafak中leader負責處理讀寫操作, 而follower只負責副本數據同步;
- 如果leader出現故障, 其他follower會被重新選舉為leader;
- follower像一個consumer一樣, 拉取leader對應分區的數據, 并保存到日志數據文件中;
kafka會自動將leader均勻地分配在不同的borker中; 如果一個borker有多個分區的leader, 就會出現不均衡的情況, 應該盡量讓leader均勻分配;
- topic: topic name;
- partition: 當前行表示的partition ID;
- Log Size: 數據條數;
- leader: 當前行表示的partition的leader所在的replica ID;
- replicas: 當前行表示的partition的replica ID列表;
- In Sync Replicas: 可用的副本
- Preferred Leader: 當前為首選leader(沒有發生borker掛掉的情況)
- Under Replicated: 正在同步副本;
1.2 AR、ISR、OSR
在實際環境中, leader有可能會出現一些故障, 所以, kafak一定會選舉出新的leader; kafak中, 把follower按照不同的狀態分為三類: AR、ISR、OSR;
- AR: 分區的所有副本; (Assigned Replicas – 已分配的副本);
- ISR: 所有與leader副本保持一定程度同步的副本(包括leader); (In Sync Replicas – 正在同步的副本)
- OSR: 與leader副本同步滯后過多的副本(不包括leader); (Out-Of-Sync Replicas)
- AR = ISR + OSR
- 正常情況下, 所有的follower副本都應該與leader副本保持同步, 即AR = ISR, OSR為空
當第一個borker掛掉之后;
重啟第一個borker:
此時. 發現, leader并沒有恢復為推薦配置;
1.3 Leader選舉
leader對于消息的寫入以及讀取是非常關鍵的, 此時有2個問題:
- kafak如何確定某個partition的副本是leader, 哪個是follower呢?
- 某個leader崩潰了, 如何快速確定另一個是leader呢? 因為kafak的吞吐量很高、延遲很低; 所以選舉leader必須很快
1.3.1 Controller介紹
- kafak啟動時, 會在所有的broker中選擇一個controller, controller是高可用的;
- leader和follower是針對partition副本的, 而controller是針對broker的;
- 創建topic, 添加分區, 修改副本數據量之類的任務管理都是由controller完成的;
- kafka分區副本keader的選舉, 也是有controller決定的;
1.3.2 controller的選舉:
- 在kafka集群啟動的時候, 每個broker都會嘗試去Zookeeper上注冊稱為COntroller(zk臨時節點);
- 但只有一個競爭成功, 其他的broker會注冊該節點的監視器;
- 一個節點狀態發生變化, 就可以進行相應的處理;
- controller也是高可用的, 一旦某個broler崩潰, 其他的broker會重新注冊為controller;
1.3.3 找到當前Kafka集群的controller
- 點擊Kafka Tools的「Tools」菜單, 找到「ZooKeeper Brower…」;
- 點擊左側樹形結構的controller節點, 就可以查看到哪個broker是controller了;
1.3.4 controller選舉partition leader
- 所有partition的leader選舉都由controller決定的;
- controller會將leader的改變直接通過RPC的方式通知需要為此做出響應的broker;
- controller讀取到當前分區ISR, 只要有一個Replica還幸存,就選擇這個作為leader, 否則, 則任意選擇一個replica作為leader;
- 如果該partition的所有replica都已經宕機, 則新的leader為-1;
為什么不能通過ZK的方式選舉partition的leader?
- 如果業務很多, kafka集群會有很多partiton;
- 假設某個broker宕機, 就會出現很多個partition都需要重新選舉leader;
- 如果使用zookeeper選舉leader, 會給zookeeper帶來巨大的壓力; 所以, kafka中leader的選舉不能使用zk實現;
1.4 leader的負載均衡
- kafka中引入[preferred-replica]的概念, 即: 優先的replic;
- 在ISR列表中, 第一個replica就是preferred-replica;
- 使用一下腳本可以將preferred-replica設置為leader, 均勻分配每個分區副本的leader;
- --partition: 指定需要重新分配leader的partition編號;
如果某個broker crash之后, 就可能會導致副本的leader分布不均勻, 就是一個broker上存一個topic下不同partition的leader副本;
2. kafka生產、消費數據工作流程
2.1 kafka數據寫入的流程
- 生產者先從 zookeeper 的 "/brokers/topics/主題名/partitions/分區名/state"節點找到該 partition 的leader
- 生產者在ZK中找到該ID找到對應的broker
- broker進程上的leader將消息寫入到本地log中;
- follower從leader上拉取消息, 寫入到本地log, 并向leader發送ACK;
- leader接收到所有的ISR中的Replica的ACK后, 并向生產者返回ACK;
2.2 kafka數據消費流程
2.2.1 兩種消費模式
- kafka采用拉取模型, 由消費者自己記錄消費狀態, 每個消費者互相獨立地順序拉取每個分區的消息;
- 消費者可以按照任意的順序消費消息; 比如, 消費者可以重置到舊的偏移量, 重新處理之前已經消費過的消息; 或者直接跳到最近的位置, 從當前的時刻開始消費;
2.2.2 Kafka消費數據流程
- 每個consumer都可以根據分配策略(默認RangeAssignor), 獲得要消費的分區;
- 獲取到consumer對應的offset(默認從ZK中獲取上一次消費的offset);
- 找到該分區的leader, 拉取數據;
- 消費者提交offset;
3. kafka的數據存儲形式:
- 一個topic由多個分區組成;
- 一個分區(partition)由多個segment(段)組成;
- 一個segment(段)由多個文件組成(log-數據、index-稀疏索引、timeindex);
3.1 存儲日志
- kafka中的數據是保存在kafka_xxx-xxx/data中;
- 消息是保存在以topic-partitionID 的文件家中
- 數據文件夾中包含一下內容:
這些分別對應:
| 00000000000000000000.index | 索引文件,根據offset查找數據就是通過該索引文件來操作的 |
| 00000000000000000000.log | 日志數據文件 |
| 00000000000000000000.timeindex | 時間索引 |
| leader-epoch-checkpoint | 持久化每個partition leader對應的LEO(log end offset、日志文件中下一條待寫入消息的offset) |
- 每個日志文件的文件名為起始偏移量, 因為每個分區的起始偏移量是0, 所以, 分區的日志文件都以0000000000000000000.log開始;
- 默認的每個日志文件最大為log.segment.bytes =1024*1024*1024=1G
- 為了簡化根據offset查找消息, Kafka日志文件名設計為開始的偏移量
創建一個topic: test_10m, 該topic每個日志數據文件最大為10M
bin/kafka-topics.sh --create --zookeeper node1.cn:9092 --topic test_10m --replication-factor 2 --partitions 3 --config segment.bytes=104857603.2 讀取消息
kafka日志的存儲格式:
- 根據 offset, 首先需要找到存儲數據的segment段(ps: offset指分區的全局偏移量);
- 然后根據這個"全局分區offset"找到相對于文件的"segment段offset";
- 最后在根據 segment段offset"讀取消息;
- 為了提高查詢效率, 每個文件都會維護對應的范圍內存, 查找的時候就是使用簡單的二分查找;
3.3 刪除消息
- kafka中, 消息是會被定期清理的, 一次刪除一個segment段的日志文件;
- kafka的日志管理器, 會根據kafke的配置, 來決定那些文件可以被刪除;
4. 消息不丟失機制:
4.1 broker數據不丟失:
生產者通過分區的leader寫入數據后, 所有在ISR中follower都會從leader中賦值數據, 這樣, 可以確保及時leader崩潰了, 其他的follower的數據仍然是可用的;
4.2 生產者數據不丟失:
- 生產者連接leader寫入數據時, 可以通過ACK機制來確保數據已成功寫入;
- ACK機制有三個可選配置:
- -1: 表示所有的節點都收到數據(leader和follower);
- 1: 表示leader收到數據;
- 0: 生產者只負責生產數據, 不關心數據是否丟失(可能會丟失數據, 但是性能最好);
- 生產者可以采用同步和異步兩種方式發送結果:
- 同步: 發送一批數據給kafka后, 等待kafka返回結果;
- 異步: 發送一批數據給kafka后, 只是提供一個回調函數;
ps: 如果broker遲遲不給ACK, 而buffer又滿了, 可以設置是否直接清空buffer中的數據
4.3 消費者數據不丟失:
在消費者消費數據的時候, 只要每個消費者記錄好offset值即可, 就能保證數據不丟失;
4.3.1 一般消費丟失情況:
- 消費者從ZK中拉取offset, 開始讀取消息;
- 在業務程序中處理這條消息, 并將處理后的結果寫入到存儲中;
- 在寫入存儲的時候, 出現了故障, 導致寫入失敗;
- 但是, 消費者提交了offset到ZK中;
- 下一次消費就會從新的offset開始消費, 導致了數據丟失;
消息傳遞的語義性
- At-most once : 最多一次 (只管把數據消費到, 不管有沒有成功, 可能會有數據丟失);
- At-least once: 最少一次(有可能會出現重復消費)
- Exactly-Once: 僅有一次(事務性的保障, 保證消息有且僅被處理一次 )
4.3.2 重復消費:
- 根據offset來消費partition中的數據;
- 消費者業務程序處理數據, 并將結果成功寫入到DB中;
- 將offset提交到zookeeper, 但是是失敗的;
- 此時會出現重復消費;
使用kafka的事務沒法解決"只消費一次";
kafka的事務是針對當前kafka集群中的消費者、生產者操作的;
解決方案:
- 通過lowlevel API從mysql中讀取offset;
- 通過mysql的事務, 將寫入到mysql的數據和offset放在一個mysql事務里, 要么全部成功, 要么全部失敗, 就可以實現"只消費一次";
5. 數據積壓:
kafka消費者消費數據的速度是非常快的, 但如果由于處理kafka消息時, 由于有一些外部I/O或者網絡擁堵, 就會造成kafak中的數據積壓; 如果數據一致積壓, 會導致出來的數據的實時性受到較大影響;
5.1 使用工具查看積壓情況
5.1.1 kafka Tools
5.1.2 kafka-Eagle:
5.2 解決數據積壓問題:
當kafka出現數據積壓時, 首先要找到數據積壓的原因
常見的場景:
數據持久化時出錯;
消費者超時失敗, 導致數據消費緩慢; ? 將消費時間修改的大一些;
在實際生產中, 要有監控系統, 如果出現這種情況, 需要盡快處理;
雖然 Spark Streaming / Flink等流式處理中間件可以實現背壓, 但是數據積累太多一定會對實時系統的實時性造成影響;
五、kafka中數據清理(Log Deletion):
kafka的消息存儲在磁盤中, 為了控制磁盤占用空間, kafka需要不斷地對過去的一些消息進行清理; kafka的每個分區都有很多的日志文件, 這樣也是為了方便的進行日志清理;
在kafka中, 提供兩種日志清理方式:
- 日志刪除(Log Deletion): 按照指定的策略直接刪除不符合條件的日志;
- 日志壓縮(Log Compaction): 按照消息的key進行整合, 有相同key的但有不同value值, 只保留最后一個版本;
在kafka的broker或topic中配置
| log.cleaner.enable | true(默認) | 開啟自動清理日志功能 |
| log.cleanup.policy | delete(默認) | 刪除日志 |
| log.cleanup.policy | ompaction | 壓縮日志 |
| log.cleanup.policy | delete,compaction | 同時支持刪除和壓縮 |
1. 日志刪除
日志刪除是以段(segment日志)為單位來進行定期清理的;
1.1 定時日志刪除任務
kafka日志管理器中會有一個專門的日志刪除任務來定期檢測和刪除不符合保留條件的日志分段文件, 這個周期可以通過broker端參數log.retention.check.interval.ms來配置, 默認為 300 000, 即 5分鐘;
當前日志分段的保留策略有3中:
- 基于時間的保留策略;
- 基于日志大小的保留策略;
- 基于日志起始偏移量的保存策略;
1.2基于時間的保留策略
如果kafka中的消息超過指定的閾值, 就會將日志進行自動化清理:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
其中, 優先級為 log.retention.ms > log.retention.minutes > log.retention.hours;
默認情況, 在borker中, 配置為:
也就是, 默認日志的保留時間為168小時, 即7天;
1.3 刪除日志分段:
刪除日志分段是:
- 從日志文件對象中所維護日志分段的跳躍表中移除待刪除的日志分段, 以保證沒有線程對這些日志分段進行讀取操作;
- 將日志分段文件添加上.deleted的后綴(包括日志分段對應的索引文件)
- kafka的后臺定時任務會定期刪除這些.deleted為后綴的文件, 這個任務的延遲執行時間可以通過file.delete.delay.ms參數來設置, 默認為 60000, 即 1分鐘;
1.3.1 設置 topic 5秒刪除一次:
- 為了方便觀察, 設置段文件的大小為1M
key: segment.bytes
value: 1048576
- 設置topic的刪除策略
key: retention.ms
value: 5000
- 嘗試往topic中添加一些數據, 等待一會, 觀察日志的刪除情況; 發現, 日志會定期被標記為刪除, 然后被刪除;
1.4 基于日志大小的保留策略
日志刪除任務會檢查當前日志的大小是否超過設定的閾值來尋找可刪除的日志分段的文件集合;
可以通過broker端參數 log.retention.bytes來配置, 默認值為-1, 表示無窮大; 如果超過該大小, 會自動將超出部分刪除;
log.retention.bytes 配置的是日志文件的總大小, 而不是單個的日志分段的大小; 一個日志文件包含多個日志分段;
1.5 基于日志起始偏移量保留策略
每個segment日志都有它的起始偏移量, 如果起始偏移量小于 logStartOffset, 那么這些日志文件將會標記為刪除;
2. 日志壓縮(Log Compaction)
Log Compaction是默認的日志刪除之外的清理過時數據的方式; 它會將相同的key對應的數據只保留一個版本;
- Log Compaction執行后, offset將不再連續, 但依然可以查詢Segment;
- Log Compaction執行前后, 日志分段中的每條消息偏移量保持不變; Log Compaction會生成一個新的Segment文件;
- Log Compaction是針對key的, 在使用的時候注意每個消息的key不為空
- 基于Log Compaction可以保留key的最新更新, 可以基于Log Compaction來恢復消費者的最新狀態;
六、Kafka配額限速機制(Quotas)
生產者和消費者以極高的速度生產/消費大量數據或產生請求, 從而占用broker上的全部資源, 造成網絡IO飽和; 有了配額(Quotas)就可以避免這些問題;
Kafka支持配額管理, 從而可以對Producer和Consumer的produce&fetch操作進行流量限制, 防止個別業務壓爆服務器;
1. 限制producer端速率
為所有client id設置默認值, 以下為所有producer程序設置其TPS不超過1MB/s, 即1048576?/s;
命令如下:
運行基準測試, 觀察生產消息的速率:
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=12. 限制consumer端速率
為指定的topic進行限速, 以下為所有consumer程序設置topic速率不超過1MB/s, 即1048576/s;
命令如下:
運行基準測試:
bin/kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test --fetch-size 1048576 --messages 5000003. 取消Kafka的Quota配置
使用以下命令, 刪除Kafka的Quota配置:
取消生產者:
取消消費者
bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default總結
以上是生活随笔為你收集整理的kakfa从入门到放弃(四): 分区和副本机制、高级与低级API、 kafka-eagle、原理、数据清理、限速的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 飞桨2.0 PaddleDetectio
- 下一篇: ubuntu 常用软件