kafka配置_Kafka生产环境的几个重要配置参数
Kafka在彈性、容錯(cuò)性以及高吞吐量方面有著很大的優(yōu)勢。想要達(dá)到生產(chǎn)環(huán)境最優(yōu),發(fā)揮這些特性,需要我們進(jìn)行一系列的配置。Kafka提供了非常多的配置屬性,對(duì)于初學(xué)者而言,很容易陷入困惑。其實(shí),多數(shù)的配置已經(jīng)滿足了大部分的使用場景,本文分享總結(jié)了幾個(gè)比較重要的配置參數(shù),主要是針對(duì)producer端的配置,希望對(duì)你有所幫助。本文所討論的配置文件包括:
√acks√min.insync.replicas√replica.lag.time.max.ms√retries√enable.idempotence√max.in.flight.requests.per.connection √buffer.memory√max.block.ms√linger.ms√batch.size√compression.typeacks
acks參數(shù)指定了必須要有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才認(rèn)為該消息是寫入成功的,這個(gè)參數(shù)對(duì)于消息是否丟失起著重要作用,該參數(shù)的配置具體如下:
acks=0,表示生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來自服務(wù)器的響應(yīng). ?換句話說,一旦出現(xiàn)了問題導(dǎo)致服務(wù)器沒有收到消息,那么生產(chǎn)者就無從得知,消息也就丟失了. 改配置由于不需要等到服務(wù)器的響應(yīng),所以可以以網(wǎng)絡(luò)支持的最大速度發(fā)送消息,從而達(dá)到很高的吞吐量。
acks=1,表示只要集群的leader分區(qū)副本接收到了消息,就會(huì)向生產(chǎn)者發(fā)送一個(gè)成功響應(yīng)的ack,此時(shí)生產(chǎn)者接收到ack之后就可以認(rèn)為該消息是寫入成功的. 一旦消息無法寫入leader分區(qū)副本(比如網(wǎng)絡(luò)原因、leader節(jié)點(diǎn)崩潰),生產(chǎn)者會(huì)收到一個(gè)錯(cuò)誤響應(yīng),當(dāng)生產(chǎn)者接收到該錯(cuò)誤響應(yīng)之后,為了避免數(shù)據(jù)丟失,會(huì)重新發(fā)送數(shù)據(jù).這種方式的吞吐量取決于使用的是異步發(fā)送還是同步發(fā)送.
尖叫提示:如果生產(chǎn)者收到了錯(cuò)誤響應(yīng),即便是重新發(fā)消息,還是會(huì)有可能出現(xiàn)丟數(shù)據(jù)的現(xiàn)象. 比如,如果一個(gè)沒有收到消息的節(jié)點(diǎn)成為了新的Leader,消息就會(huì)丟失.
acks =all,表示只有所有參與復(fù)制的節(jié)點(diǎn)(ISR列表的副本)全部收到消息時(shí),生產(chǎn)者才會(huì)接收到來自服務(wù)器的響應(yīng). 這種模式是最高級(jí)別的,也是最安全的,可以確保不止一個(gè)Broker接收到了消息. 該模式的延遲會(huì)很高.
min.insync.replicas
上面提到,當(dāng)acks=all時(shí),需要所有的副本都同步了才會(huì)發(fā)送成功響應(yīng)到生產(chǎn)者. 其實(shí)這里面存在一個(gè)問題:如果Leader副本是唯一的同步副本時(shí)會(huì)發(fā)生什么呢?此時(shí)相當(dāng)于acks=1.所以是不安全的.
Kafka的Broker端提供了一個(gè)參數(shù)min.insync.replicas,該參數(shù)控制的是消息至少被寫入到多少個(gè)副本才算是"真正寫入",該值默認(rèn)值為1,生產(chǎn)環(huán)境設(shè)定為一個(gè)大于1的值可以提升消息的持久性. 因?yàn)槿绻礁北镜臄?shù)量低于該配置值,則生產(chǎn)者會(huì)收到錯(cuò)誤響應(yīng),從而確保消息不丟失.
replica.lag.time.max.ms
In-sync replica(ISR)稱之為同步副本,ISR中的副本都是與Leader進(jìn)行同步的副本,所以不在該列表的follower會(huì)被認(rèn)為與Leader是不同步的. 那么,ISR中存在是什么副本呢?首先可以明確的是:Leader副本總是存在于ISR中. 而follower副本是否在ISR中,取決于該follower副本是否與Leader副本保持了“同步”.
尖叫提示:對(duì)于"follower副本是否與Leader副本保持了同步"的理解如下:
(1)上面所說的同步不是指完全的同步,即并不是說一旦follower副本同步滯后與Leader副本,就會(huì)被踢出ISR列表.
(2)Kafka的broker端有一個(gè)參數(shù)**replica.lag.time.max.ms**, 該參數(shù)表示follower副本滯后與Leader副本的最長時(shí)間間隔,默認(rèn)是10秒. ?這就意味著,只要follower副本落后于leader副本的時(shí)間間隔不超過10秒,就可以認(rèn)為該follower副本與leader副本是同步的,所以哪怕當(dāng)前follower副本落后于Leader副本幾條消息,只要在10秒之內(nèi)趕上Leader副本,就不會(huì)被踢出出局.
(3)如果follower副本被踢出ISR列表,等到該副本追上了Leader副本的進(jìn)度,該副本會(huì)被再次加入到ISR列表中,所以ISR是一個(gè)動(dòng)態(tài)列表,并不是靜態(tài)不變的。
retries
生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng))。在這種情況下, retries參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤。默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待100ms ,可以通過retry.backoff.ms 參數(shù)來配置時(shí)間間隔。
比如,設(shè)置了acks=all和min.insync.replicas=2。由于某種原因,所有follower都掛了,由于min.insync.replicas=2,所以生產(chǎn)者無法收到來自Broker端的ack。
此時(shí)我們會(huì)從Producer端收到一個(gè)錯(cuò)誤消息:"Broker: Not enough in-sync replicas"。這就意味著Kafka不能在Broker上追加生產(chǎn)的消息(數(shù)據(jù))了,因?yàn)榇藭r(shí)的ISR的數(shù)量不夠。此時(shí)在Broker端會(huì)有如下的錯(cuò)誤消息:
org.apache.kafka.common.errors.NotEnoughReplicasException:?The?size?of?the?current?ISR?Set(0)?is?insufficient?to?satisfy?the?min.isr?requirement?of?2?for?partition默認(rèn)情況下,Producer不會(huì)對(duì)此錯(cuò)誤進(jìn)行處理,這就會(huì)造成消息丟失,即**at-most-once **語義。我們可以通過配置重試次數(shù)來讓生產(chǎn)者重新發(fā)送消息。比如配置retries=3,默認(rèn)為0
enable.idempotence
在某些情況下,實(shí)際上已將消息提交給了所有同步副本,但是由于網(wǎng)絡(luò)問題,Broker無法向Producer發(fā)送確認(rèn)ack。由于我們?cè)O(shè)置retries=3,所以producer將重新發(fā)送消息3次,這可能會(huì)導(dǎo)致topic中消息重復(fù)。
比如有一個(gè)producer向該topic發(fā)送1M消息,并且在提交消息之后但在生產(chǎn)者收到所有確認(rèn)ack之前,broker失敗了。在這種情況下,由于重試機(jī)制,最終可能在該topic上收到超過1M的消息,這也稱為at-lease-once語義。
當(dāng)然,我們想要實(shí)現(xiàn)的是exactly-once語義,即:即便生產(chǎn)者重新發(fā)送消息,消費(fèi)者也應(yīng)該只收到一次相同的消息。
此時(shí)需要進(jìn)行冪等操作,所謂冪等,即指一次執(zhí)行一個(gè)操作或多次執(zhí)行一個(gè)操作具有相同的效果。配置冪等很簡單,通過配置enable.idempotence=true即可,默認(rèn)為false。
那么,冪等是如何實(shí)現(xiàn)的呢?由于消息是分batch(批次)發(fā)送的,每個(gè)batch都有一個(gè)序列號(hào)。在Broker端,會(huì)追蹤每個(gè)分區(qū)的最大序列號(hào)。如果出現(xiàn)序列號(hào)較小或相等的batch(批次),broker將不會(huì)將該batch寫入topic。這樣,除了保證了冪等性,還可以確保batch的順序。
max.in.flight.requests.per.connection
該參數(shù)指定了生產(chǎn)者在收到服務(wù)器晌應(yīng)之前可以發(fā)送多少個(gè)消息。它的值越高,就會(huì)占用越多的內(nèi)存,不過也會(huì)提升吞吐量。把它設(shè)為1可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。
因?yàn)槿绻麑蓚€(gè)批次發(fā)送到單個(gè)分區(qū),并且第一個(gè)批次失敗并被重試,但是,接著第二個(gè)批次寫入成功,則第二個(gè)批次中的記錄可能會(huì)首先出現(xiàn),這樣就會(huì)發(fā)生亂序。
如果沒有啟用冪等功能,但仍然希望按順序發(fā)送消息,則應(yīng)將此設(shè)置配置為1。但是,如果已經(jīng)啟用了冪等,則無需顯式定義此配置。
buffer.memory
該參數(shù)用來設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息。如果應(yīng)用程序發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候,send()方法調(diào)用要么被阻塞,要么拋出異常,取決于如何設(shè)置max.block.ms。
當(dāng)生產(chǎn)者調(diào)用時(shí)send(),消息并不會(huì)立即發(fā)送,而是會(huì)添加到內(nèi)部緩沖區(qū)中。默認(rèn)buffer.memory值為32MB。如果生產(chǎn)者發(fā)送消息的速度超過了將消息發(fā)送到broker的速度,或者存在網(wǎng)絡(luò)問題,send()方法調(diào)用會(huì)被阻塞max.block.ms參數(shù)配置的時(shí)常,默認(rèn)1分鐘。
max.block.ms
該參數(shù)指定了在調(diào)用send()方法或使用partitionsFor()方法獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞時(shí)間。當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)已滿,或者沒有可用的元數(shù)據(jù)時(shí),這些方法就會(huì)被阻塞。在阻塞時(shí)間達(dá)到max.block.ms時(shí),生產(chǎn)者會(huì)拋出超時(shí)異常。
linger.ms
該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間。kafka生產(chǎn)者會(huì)在批次填滿或linger.ms達(dá)到上限時(shí)把批次發(fā)送出去。默認(rèn)情況下,只要有可用的線程,生產(chǎn)者就會(huì)把消息發(fā)送出去,就算批次里只有一個(gè)消息。把linger.ms設(shè)置成比0大的數(shù),讓生產(chǎn)者在發(fā)送批次之前等待一會(huì)兒,使更多的消息加入到這個(gè)批次。雖然這樣會(huì)增加延遲,但也會(huì)提升吞吐量(因?yàn)橐淮涡园l(fā)送更多的消息,每個(gè)消息的開銷就變小了)。
batch.size
當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算(而不是消息個(gè)數(shù))。當(dāng)批次被填滿,批次里的所有消息會(huì)被發(fā)送出去。不過生產(chǎn)者井不一定都會(huì)等到批次被填滿才發(fā)送,這取決于linger.ms的配置,比如如果linger.ms時(shí)間到了,即便批次只包含一個(gè)消息,也會(huì)被立即發(fā)送。所以就算把批次大小設(shè)置得很大,也不會(huì)造成延遲,只是會(huì)占用更多的內(nèi)存而已。但如果設(shè)置得太小,因?yàn)樯a(chǎn)者需要更頻繁地發(fā)送消息,會(huì)增加一些額外的開銷。
可以使用配置使用linger.ms和batch.size。linger.ms是準(zhǔn)備好發(fā)送批次之前的延遲時(shí)間,默認(rèn)值為0。這意味著即使批次中只有1條消息,批次也會(huì)立即發(fā)送。有時(shí),會(huì)增加linger.ms以減少請(qǐng)求數(shù)量并提高吞吐量。但這將導(dǎo)致更多消息保留在內(nèi)存中。batch.size是單個(gè)批次的最大大小,當(dāng)滿足這兩個(gè)要求中的任何一個(gè)時(shí),將發(fā)送批次。
compression.type
默認(rèn)情況下,消息發(fā)送時(shí)不會(huì)被壓縮。該參數(shù)可以設(shè)置為snappy 、gzip 或lz4 ,它指定了消息被發(fā)送給broker 之前使用哪一種壓縮算也進(jìn)行壓縮。使用壓縮可以降低網(wǎng)絡(luò)傳輸開銷和存儲(chǔ)開銷,而這往往是向Kafka 發(fā)送消息的瓶頸所在。
總結(jié)
本文主要分享了Kafka幾個(gè)比較重要的配置參數(shù),并對(duì)每個(gè)參數(shù)進(jìn)行了詳細(xì)解釋,通過配置這些參數(shù),可以充分發(fā)揮Kafka的優(yōu)良特性。希望本文對(duì)你有所幫助。
總結(jié)
以上是生活随笔為你收集整理的kafka配置_Kafka生产环境的几个重要配置参数的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 赛博义体成真,帕金森病患者植入脊椎电极后
- 下一篇: java 链表反转_剑指BAT:如何最优