kafka0.9 java commit_0.9版本kafka优化及常见错误(转载)
Kafka設(shè)計的初衷是迅速處理短小的消息,一般10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,我們需要處理更大的消息,比如XML文檔或JSON內(nèi)容,一個消息差不多有10-100M,這種情況下,Kakfa應(yīng)該如何處理?
針對這個問題,有以下幾個建議:
最好的方法是不直接傳送這些大的數(shù)據(jù)。如果有共享存儲,如NAS, HDFS,
S3等,可以把這些大的文件存放到共享存儲,然后使用Kafka來傳送文件的位置信息。
第二個方法是,將大的消息數(shù)據(jù)切片或切塊,在生產(chǎn)端將數(shù)據(jù)切片為10K大小,使用分區(qū)主鍵確保一個大消息的所有部分會被發(fā)送到同一個kafka分區(qū)(這樣每一部分的拆分順序得以保留),如此以來,當(dāng)消費(fèi)端使用時會將這些部分重新還原為原始的消息。
第三,Kafka的生產(chǎn)端可以壓縮消息,如果原始消息是XML,當(dāng)通過壓縮之后,消息可能會變得不那么大。在生產(chǎn)端的配置參數(shù)中使用compression.codec和commpressed.topics可以開啟壓縮功能,壓縮算法可以使用GZip或Snappy。
1. Broker配置
message.max.bytes
默認(rèn):1000000。broker能接收消息的最大字節(jié)數(shù),這個值應(yīng)該比消費(fèi)端的fetch.message.max.bytes更小才對,否則broker就會因為消費(fèi)端無法使用這個消息而掛起。
log.segment.bytes
默認(rèn):
1GB。kafka數(shù)據(jù)文件的大小,確保這個數(shù)值大于一個消息的長度。一般說來使用默認(rèn)值即可(一般一個消息很難大于1G,因為這是一個消息系統(tǒng),而不是文件系統(tǒng))。
replica.fetch.max.bytes
默認(rèn):
1MB。broker可復(fù)制的消息的最大字節(jié)數(shù)。這個值應(yīng)該比message.max.bytes大,否則broker會接收此消息,但無法將此消息復(fù)制出去,從而造成數(shù)據(jù)丟失。
2. Producer配置
producer方面沒有什么太多需要優(yōu)化的,最需要注意的一點是:不要使用0.8版本的Producer。
0.8版本的Producer在面對大量數(shù)據(jù)的寫入時,會導(dǎo)致producer端使用的直接內(nèi)存無法釋放,最終導(dǎo)致應(yīng)用被操作系統(tǒng)中斷掉。
3. Consumer配置
fetch.message.max.bytes
默認(rèn) 1MB。消費(fèi)者能讀取的最大消息。這個值應(yīng)該大于或等于message.max.bytes。
所以,如果你一定要選擇kafka來傳送大的消息,還有些事項需要考慮。要傳送大的消息,不是當(dāng)出現(xiàn)問題之后再來考慮如何解決,而是在一開始設(shè)計的時候,就要考慮到大消息對集群和主題的影響。
性能
根據(jù)前面提到的性能測試,kafka在消息為10K時吞吐量達(dá)到最大,更大的消息會降低吞吐量,在設(shè)計集群的容量時,尤其要考慮這點。
可用的內(nèi)存和分區(qū)數(shù)
Brokers會為每個分區(qū)分配replica.fetch.max.bytes參數(shù)指定的內(nèi)存空間,假設(shè)replica.fetch.max.bytes=1M,且有1000個分區(qū),則需要差不多1G的內(nèi)存,確保
分區(qū)數(shù)最大的消息不會超過服務(wù)器的內(nèi)存,否則會報OOM錯誤。同樣地,消費(fèi)端的fetch.message.max.bytes指定了最大消息需要的內(nèi)存空間,同樣,分區(qū)數(shù)最大需要內(nèi)存空間
不能超過服務(wù)器的內(nèi)存。所以,如果你有大的消息要傳送,則在內(nèi)存一定的情況下,只能使用較少的分區(qū)數(shù)或者使用更大內(nèi)存的服務(wù)器。
垃圾回收
到現(xiàn)在為止,我在kafka的使用中還沒發(fā)現(xiàn)過此問題,但這應(yīng)該是一個需要考慮的潛在問題。更大的消息會讓GC的時間更長(因為broker需要分配更大的塊),隨時關(guān)注GC的日志和服務(wù)器的日志信息。如果長時間的GC導(dǎo)致kafka丟失了zookeeper的會話,則需要配置zookeeper.session.timeout.ms參數(shù)為更大的超時時間。
4. 常見錯誤
4.1 kafka.common.MessageSizeTooLargeException
這個異常的原因比較明顯,單個消息太大了,到官網(wǎng)找找配置就可以解決。
如果是生產(chǎn)者報錯,修改Kafka
Broker的配置,在server.properties中配置message.max.bytes,默認(rèn)是1M(約)。
如果是消費(fèi)者報錯,修改消費(fèi)者中增加fetch.message.max.bytes的配置,這個配置的值要大于Broker的message.max.bytes配置。
修改了Broker的message.max.bytes,同時需要修改replica.fetch.max.bytes,并且replica.fetch.max.bytes要大于message.max.bytes。
4.2 Ierator is in failed state
這真的是一個神一般存在的錯誤提示。Consumer消費(fèi)時出現(xiàn)Iterator is in failed
state的錯誤提示,錯誤量很多,并且consumer不再消費(fèi)kafka了,造成消息堆積。具體報錯如下:
[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.
at
...
但這個錯誤并不是真正的錯誤,是因為MessageSizeTooLargeException導(dǎo)致的,發(fā)生MessageSizeTooLargeException異常會導(dǎo)致Iterator
is in failed
state錯誤的發(fā)生,但是MessageSizeTooLargeException只會打印一次,而Iterator is in
failed state錯誤會隨著讀取方法的調(diào)用不停的打,完全能將錯誤分析方向帶偏了。具體如下:
kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic LogProd1 partition 27 at fetch offset 114. Increase the fetch size, or decrease the maximum message size the br
oker will allow.
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.
at
[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.
at
[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.
at
[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.
at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.
at
解決MessageSizeTooLargeException即可。
4.3 kafka.consumer.ConsumerTimeoutException
這個錯誤提示也比較直白,消費(fèi)者消費(fèi)數(shù)據(jù)時超時了。
默認(rèn)情況下是阻塞式消費(fèi)數(shù)據(jù),不會報這個錯誤。
如果consumer設(shè)置consumer.timeout.ms,則消費(fèi)者消費(fèi)數(shù)據(jù)時超時就會報錯。
消費(fèi)者消費(fèi)數(shù)據(jù)超時的一種情況就是所有數(shù)據(jù)均被消費(fèi)完了。
4.4 java.io.IOException: Broken pipe
4.5 java.io.IOException: Connection reset by peer
4.6
org.apache.kafka.clients.consumer.CommitFailedException: Commit
cannot be completed due to group rebalance
這個錯誤提示比較直白,意思是消費(fèi)者消費(fèi)了數(shù)據(jù),但在規(guī)定時間內(nèi)沒有commit,所以kafka認(rèn)為這個consumer掛掉了,這時對consumer的group進(jìn)行再平衡。
解決方法有三種:
增加消費(fèi)超時時間。消費(fèi)超時時間通過heartbeat.interval.ms設(shè)置,heartbeat.interval.ms的大小不能超過session.timeout.ms,session.timeout.ms必須在[group.min.session.timeout.ms,
group.max.session.timeout.ms]范圍內(nèi)。
減少消息處理時間;由后端處理決定。
減少一次消費(fèi)的消息量。max.partition.fetch.bytes決定容量,max.poll.records決定數(shù)量。max.partition.fetch.bytes規(guī)定了一個partition一次pull獲取的獲取的數(shù)據(jù)大小。max.poll.records規(guī)定一次pull獲取的消息數(shù)量。
轉(zhuǎn)載地址:http://blog.csdn.net/weitry/article/details/53009134
總結(jié)
以上是生活随笔為你收集整理的kafka0.9 java commit_0.9版本kafka优化及常见错误(转载)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java语言程序设计你_清华大学出版社-
- 下一篇: 阈值Java_OpenCV简单阈值