Kafka配置
服務器端
安裝相關(guān)
| 參數(shù) | 意義 |
| broker.id | broker 的編號,如果集群中有多個broker ,則每個broker 的編號需要設(shè)置的不同 |
| port | kafka監(jiān)聽端口,一般默認是9092,使用1024以前端口的話就需要root權(quán)限。E.g:開啟線程消費時用到的就是這個端口 |
| zookeeper.connect | 用于保存broker元數(shù)據(jù)的zookeeper地址是通過該參數(shù)指定的,當producer向kafka topic發(fā)布消息事,指定的就是zookeeper.connect這個路徑。多個用逗號分開,例如: debugo01:2181,debugo02,debugo03 最佳的實踐方式是再加一個chroot 路徑,這樣既可以明確指明該chroot 路徑下的節(jié)點是為Kafka所用的,也可以實現(xiàn)多個Kafka 集群復用一套ZooKeeper 集群,這樣可以節(jié)省更多的硬件資源。包含chroot 路徑的配置類似于localhost1:2181 , localhost2:2181, |
| zookeeper.connection.timeout.ms | 連接zk的超時時間 |
| zookeeper.sync.time.ms | ZooKeeper集群中l(wèi)eader和follower之間的同步實際 |
| log.dirs/log.dir | 日志存放目錄,多個目錄使用逗號分割。 Kafka是把所有消息都保存在在磁盤上,存放目錄即通過該參數(shù)指定的. 一般情況下,log.dir用來配置單個根目錄,而log.dirs用來配置多個根目錄(以逗號分隔〉,但是Kafka井沒有對此做強制性限制,也就是說,log.dir和log.dirs都可以用來配置單個或多個根目錄。log.dirs的優(yōu)先級比log.dir高,但是如果沒有配置log.dirs,則會以log.dir配置為準 |
| listeners | 客戶端要連接broker的入口地址列表。
? |
| advertised.listeners | 與此參數(shù)關(guān)聯(lián)的還有advertised.listeners,作用和listeners類似,默認值也為null。 不過advertised.listeners主要用于IaaS(InfrastructureasaService)環(huán)境,比如公有云上的機器通常配備有多塊網(wǎng)卡,即包含私網(wǎng)網(wǎng)卡和公網(wǎng)網(wǎng)卡,對于這種情況而言,可以設(shè)置advertised.listeners參數(shù)綁定公網(wǎng)IP供外部客戶端使用,而配置listeners參數(shù)來綁定私網(wǎng)IP地址供broker間通信使用。 |
| ? | ? |
運行相關(guān)
Producer端
| 參數(shù) | 意義 |
| buffer.memory | RecordAccumulator緩存的大小。默認值為33554432B,即32MB |
| max.block.ms | RecordAccumulator緩存空間不足時,KafkaProducer的send()方法調(diào)用要么被阻塞的時長。超時則拋出異常。默認值為60000,即60秒。 |
| batch.size | 客戶端發(fā)送消息時,BufferPool中可以放進去的ByteBuffer,大小。默認值為16384B,即16KB。 ProducerBatch創(chuàng)建時,如果不超過此值,則根據(jù)此值創(chuàng)建,并能夠通過BufferPool復用。如果大于此值,則以實際大小創(chuàng)建,不會被復用。 |
| acks | 這個參數(shù)用來指定分區(qū)中必須要有多少個副本收到這條消息,之后生產(chǎn)者才會認為這條消息是成功寫入的。acks 是生產(chǎn)者客戶端中一個非常重要的參數(shù),它涉及消息的可靠性和吞吐量之間的權(quán)衡。acks 參數(shù)有3 種類型的值(都是字符串類型): acks = 1 。默認值即為l 。生產(chǎn)者發(fā)送消息之后,只要分區(qū)的leader 副本成功寫入消息,那么它就會收到來自服務端的成功響應 |
| max.request.size | 這個參數(shù)用來限制生產(chǎn)者客戶端能發(fā)送的消息的最大值,默認值為1048576B ,即1MB |
| retries | 配置生產(chǎn)者重試的次數(shù),默認值為0,即在發(fā)生異常的時候不進行任何重試動作 |
| retry.backoff.ms | 默認值為100 ,它用來設(shè)定兩次重試之間的時間間隔 |
| max.in.flight.requests.per.connection | 在需要保證消息順序的場合建議把參數(shù)配置為1 ,而不是把acks 配置為0 , 不過這樣也會影響整體的吞吐。 |
| compression.type | 這個參數(shù)用來指定消息的壓縮方式,默認值為“ none ”,即默認情況下,消息不會被壓縮。該參數(shù)還可以配置為“ gzip ” “ snappy ” 和“ lz4 ” 。 |
| connections.max.idle.ms | 指定在多久之后關(guān)閉限制的連接,默認值是540000 ( ms ) ,即9 分鐘。 |
| linger.ms | 指定生產(chǎn)者發(fā)送ProducerBatch 之前等待更多消息( ProducerRecord )加入 ProducerBatch 的時間,默認值為0。生產(chǎn)者客戶端會在ProducerBatch 被填滿或等待時間超過linger.ms 值時發(fā)迭出去 |
| receive.buffer.bytes | 設(shè)置Socket 接收消息緩沖區(qū)( SO_REVBUF )的大小,默認值為32768B,即32M。如果設(shè)置為-1 ,則使用操作系統(tǒng)的默認值。如果Producer與Kafka 處于不同的機房,則可以適地調(diào)大這個參數(shù)值。 |
| send.buffer.bytes | 設(shè)置Socket 發(fā)送消息緩沖區(qū)(SO_SNDBUF )的大小,默認值為131072B ,即128KB 。與receive.buffer.bytes參數(shù)一樣, 如果設(shè)置為l ,則使用操作系統(tǒng)的默認值。 |
| request.timeout.ms | 配置Producer 等待請求響應的最長時間,默認值為30000ms。請求超時之后可以選擇進行重試。注意這個參數(shù)需要比broker 端參數(shù)replica.lagtime.max.ms 的值要大,這樣可以減少因客戶端重試而引起的消息重復的概率。 |
Consumer端
| 參數(shù) | 意義 |
| partition.assignment.strategy | 分區(qū)分配策略,kafka又兩個默認策略,
分區(qū)策略默認是:org.apache.kafka.clients.consumer.RangeAssignor=>Range策略 org.apache.kafka.clients.consumer.RoundRobinAssignor=>Robin策略 ? |
| fetch.min.bytes | 消費者從服務器獲取記錄的最小字節(jié)數(shù),broker收到消費者拉取數(shù)據(jù)的請求的時候,如果可用數(shù)據(jù)量小于設(shè)置的值,那么broker將會等待有足夠可用的數(shù)據(jù)的時候才返回給消費者,這樣可以降低消費者和broker的工作負載,因為當主題不是很活躍的情況下,就不需要來來回回的處理消息,如果沒有很多可用數(shù)據(jù),但消費者的CPU 使用率卻很高,那么就需要把該屬性的值設(shè)得比默認值大。如果消費者的數(shù)量比較多,把該屬性的值設(shè)置得大一點可以降低broker 的工作負載。 |
| fetch.max.wait.ms | fetch.min.bytes設(shè)置了broker返回給消費者最小的數(shù)據(jù)量,而fetch.max.wait.ms設(shè)置的則是broker的等待時間,兩個屬性只要滿足了任何一條,broker都會將數(shù)據(jù)返回給消費者,也就是說舉個例子,fetch.min.bytes設(shè)置成1MB,fetch.max.wait.ms設(shè)置成1000ms,那么如果在1000ms時間內(nèi),如果數(shù)據(jù)量達到了1MB,broker將會把數(shù)據(jù)返回給消費者;如果已經(jīng)過了1000ms,但是數(shù)據(jù)量還沒有達到1MB,那么broker仍然會把當前積累的所有數(shù)據(jù)返回給消費者。 |
| max.partition.fetch.bytes | 該屬性指定了服務器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)。它的默認值是lMB , 也 就是說,kafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超max.partitions.fetch.bytes 指定的字節(jié)。如果一個主題有20 個分區(qū)和5 個消費者,那么每個消費者需要至少4MB 的可用內(nèi)存來接收記錄。在為消費者分配內(nèi)存時,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩憤,剩下的消費者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比broker 能夠接收的最大消息的字節(jié)數(shù)(通過max.message.size 屬性配置)大, 否則消費者可能無法讀取這些消息,導致消費者一直掛起重試,例如,max.message.size設(shè)置為2MB,而該屬性設(shè)置為1MB,那么當一個生產(chǎn)者可能就會生產(chǎn)一條大小為2MB的消息,那么就會出現(xiàn)問題,消費者能從分區(qū)取回的最大消息大小就只有1MB,但是數(shù)據(jù)量是2MB,所以就會導致消費者一直掛起重試。在設(shè)置該屬性時,另一個需要考慮的因素是消費者處理數(shù)據(jù)的時間。消費者需要頻繁調(diào)用poll()方法 來避免會話過期和發(fā)生分區(qū)再均衡,如果單次調(diào)用poll () 返回的數(shù)據(jù)太多,消費者需要更多的時間來處理,可能無怯及時進行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況, 可以把max.partitioin.fetch.bytes 值改小,或者延長會話過期時間。 |
| session.timeout.ms | 該屬性指定了當消費者被認為已經(jīng)掛掉之前可以與服務器斷開連接的時間。默認是3s,消費者在3s之內(nèi)沒有再次向服務器發(fā)送心跳,那么將會被認為已經(jīng)死亡。此時,協(xié)調(diào)器將會出發(fā)再均衡,把它的分區(qū)分配給其他的消費者,該屬性與heartbeat.interval.ms緊密相關(guān),該參數(shù)定義了消費者發(fā)送心跳的時間間隔,也就是心跳頻率,一般要同時修改這兩個參數(shù),heartbeat.interval.ms參數(shù)值必須要小于session.timeout.ms,一般是session.timeout.ms的三分之一,比如,session.timeout.ms設(shè)置成3min,那么heartbeat.interval.ms一般設(shè)置成1min,這樣,可以更快的檢測以及恢復崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡(有一種情況就是網(wǎng)絡延遲,本身消費者是沒有掛掉的,但是網(wǎng)絡延遲造成了心跳超時,這樣本不該發(fā)生再均衡,但是因為網(wǎng)絡原因造成了非預期的再均衡),把該屬性的值設(shè)置得大一些,可以減少意外的再均衡,不過檢測節(jié)點崩憤-需要更長的時間。 |
| auto.offset.reset | 該屬性指定了消費者在讀取一個沒有偏移量后者偏移量無效(消費者長時間失效當前的偏移量已經(jīng)過時并且被刪除了)的分區(qū)的情況下,應該作何處理,默認值是latest,也就是從最新記錄讀取數(shù)據(jù)(消費者啟動之后生成的記錄),另一個值是earliest,意思是在偏移量無效的情況下,消費者從起始位置開始讀取數(shù)據(jù)。 |
| enable.auto.commit | 指定了消費者是否自動提交偏移量,默認值是true,為了盡量避免重復數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,有自己控制合適提交偏移量,如果設(shè)置為true, 可以通過設(shè)置 auto.commit.interval.ms屬性來控制提交的頻率 |
| client.id | 消費者客戶端ID |
| max.poll.records | 控制單次調(diào)用call方法能夠返回的記錄數(shù)量,幫助控制在輪詢里需要處理的數(shù)據(jù)量。 |
| receive.buffer.bytes + send.buffer.bytes | socket 在讀寫數(shù)據(jù)時用到的TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)為-1 ,就使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者與broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬 |
?Topic
?
?
?
?
?
總結(jié)
- 上一篇: 深入理解Kafka(1)
- 下一篇: 深入理解Kafka(2)-Produce