storm-kafka编程指南
storm-kafka編程指南
@(STORM)[kafka, 大數據, storm]
- storm-kafka編程指南
- 一原理及關鍵步驟介紹
- 一使用storm-kafka的關鍵步驟
- 1創建ZkHosts
- 2創建KafkaConfig
- 3設置MultiScheme
- 4創建Spout
- 5建立拓撲
- 二當拓撲出錯時如何從上一次的kafka位置繼續處理消息
- 關于storm-kafka開始時間點的總結
- 三結果寫回kafka
- 一使用storm-kafka的關鍵步驟
- 二完整示例
- 一簡介
- 二單詞拆分
- 三定義拓撲行為
- 1定義kafka的相關配置
- 2定義拓撲進行單詞統計后寫入一個分布式內存中
- 3從分布式內存中讀取結果并進行輸出
- 四state定義
- 1Factory類
- 3State類
- 三異常
- 1stormkafkaUpdateOffsetException
- 2storm Unsupported majorminor version 510
- 3kafka啟動后一段時間storm處理數據失敗
- 4storm拓撲啟動一段時間后卡住沒反應
一、原理及關鍵步驟介紹
storm中的storm-kafka組件提供了storm與kafka交互的所需的所有功能,請參考其官方文檔:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhosts
(一)使用storm-kafka的關鍵步驟
1、創建ZkHosts
當storm從kafka中讀取某個topic的消息時,需要知道這個topic有多少個分區,以及這些分區放在哪個kafka節點(broker)上,ZkHosts就是用于這個功能。
創建zkHosts有2種形式
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)(1)默認情況下,zk信息被放到/brokers中,此時可以使用第2種方式:
new ZkHosts("192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181”);(2)若zk信息被放置在/kafka/brokers中(我們的集群就是這種情形),則可以使用:
public ZkHosts("192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181",“/kafka")或者直接:
new ZkHosts("192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181/kafka”)默認情況下,每60秒去讀取一次kafka的分區信息,可以通過修改host.refreshFreqSecs來設置。
(3)除了使用ZkHosts來讀取分析信息外,storm-kafka還提供了一種靜態指定的方法(不推薦此方法),如:
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0 partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1 partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);由此可以看出,ZkHosts完成的功能就是指定了從哪個kafka節點讀取某個topic的哪個分區。
2、創建KafkaConfig
(1)有2種方式創建KafkaConfig
public KafkaConfig(BrokerHosts hosts, String topic)public KafkaConfig(BrokerHosts hosts, String topic, String clientId)BrokerHosts就是上面創建的實例,topic就是要訂閱的topic名稱,clientId用于指定存放當前topic consumer的offset的位置,這個id 應該是唯一的,否則多個拓撲會引起沖突。
事實上,trident的offset并不保存在這個位置,見下面介紹。
真正使用時,有2種擴展,分別用于一般的storm以及trident。
(2)core storm
Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer’s offset. The id should uniquely identify your spout.
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
public SpoutConfig(BrokerHosts hosts, String topic, String id);
In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
KafkaSpout 只接受 SpoutConfig作為參數
(3)TridentKafkaConfig,TridentKafkaEmitter只接受TridentKafkaConfig使用參數
trident消費kafka的offset位置是在建立拓撲中指定,如:
則offset的位置為:
/transactional/test/coordinator/currtx(4)KafkaConfig的所有默認參數
public int fetchSizeBytes = 1024 * 1024; public int socketTimeoutMs = 10000; public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60;storm0.10以后:
public boolean forceFromStart = false;改為
KafkaConfig.ignoreZkOffsets = false;可以通過以下方式修改:
kafkaConfig.scheme =new SchemeAsMultiScheme(new StringScheme());3、設置MultiScheme
MultiScheme用于指定如何處理從kafka中讀取到的字節,同時它用于控制輸出字段名稱。
public Iterable<List<Object>> deserialize(byte[] ser);public Fields getOutputFields();默認情況下,RawMultiScheme讀取一個字段并返回一個字節數組,而發射的字段名稱為bytes。
可以通過SchemeAsMultiScheme和 KeyValueSchemeAsMultiScheme改變這種默認行為:
上面的語句指定了將字節數組轉化為字符。
同時建立拓撲時:
會指定發射的字段名稱為str。
4、創建Spout
(1)core storm
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);(2)trident
OpaqueTridentKafkaSpoutkafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);5、建立拓撲:
(1)core-storm
builder.setSpout("kafka-reader",new KafkaSpout(spoutConf),12);kafka-reader指定了spout的名稱,12指定了并行度。
(2)trident
topology.newStream(“test", kafkaSpout). each(new Fields("str"), new FilterFunction(),new Fields("word”))….test指定了放置offset的位置,也就是txid的位置。str指定了spout發射字段的名稱。
完整示例:
Core Spout
Trident Spout
TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts("localhost"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);(二)當拓撲出錯時,如何從上一次的kafka位置繼續處理消息
1、KafkaConfig有一個配置項為KafkaConfig.startOffsetTime,它用于指定拓撲從哪個位置上開始處理消息,可取的值有3個:
(1)kafka.api.OffsetRequest.EarliestTime(): 從最早的消息開始
(2)kafka.api.OffsetRequest.LatestTime(): 從最新的消息開始,即從隊列隊伍最末端開始。
(3)根據時間點:
可以參閱 How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 的實現原理。
How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?
Kafka allows querying offsets of messages by time and it does so at segment granularity.The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.
For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling.
2、由于運行拓撲時,指定了offset在zk中保存的位置,當出現錯誤時,可以找出offset
當重新部署拓撲時,必須保證offset的保存位置不變,它才能正確的讀取到offset。
(1)對于core storm,就是
SpoutConfigspoutConf = new SpoutConfig(brokerHosts,topic, zkRoot,id);后2個參數不能變化
(2)對于trident而言,就是
第1個參數不能變化。
3、也就是說只要拓撲運行過一次KafkaConfig.startOffsetTime,之后重新部署時均可從offset中開始。
再看看這2個參數
如果將forceFromStart(0.10版本后是ignoreZkOffsets)設置為false(默認值),則每次拓撲重新啟動時,都會從之前的位置繼續處理。第一次從當前時間開始。
如果為true,則根據startOffsetTime的值來決定是從頭開始,從最新位置開始還是從某個時間點開始。
關于storm-kafka開始時間點的總結:
1、首先,讀取配置項:
kafkaConfig.forceFromStart (0.9.X)或者 KafkaConfig.ignoreZkOffsets (0.10)
如果是false的話,則繼續上一次拓撲的處理位置。如果是第一次呢?最早還是最晚?
如果是true的話,則看下面的配置。
2、
(1)kafka.api.OffsetRequest.EarliestTime(): 從最早的消息開始
(2)kafka.api.OffsetRequest.LatestTime(): 從最新的消息開始,即從隊列隊伍最末端開始。
(3)根據時間點
(三)結果寫回kafka
如果想把結果寫回kafka,并保證事務性,可以使用 storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
以下是官方說明。
Writing to Kafka as part of your topology
You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.
You need to provide implementation of following 2 interfaces
TupleToKafkaMapper and TridentTupleToKafkaMapper
These interfaces have 2 methods defined:
as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.java implementation. In the KafkaBolt, the implementation always looks for a field with field name “key” and “message” if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
KafkaTopicSelector and trident KafkaTopicSelector
This interface has only one method
The implementation of this interface should return topic to which the tuple’s key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor.
Specifying kafka producer properties
You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs section “Important configuration properties for the producer”, in your storm topology config by setting the properties map with key kafka.broker.properties.
附帶2個官方的示例
For the bolt :
TopologyBuilder builder = new TopologyBuilder();
Fields fields = new Fields("key", "message");FixedBatchSpout spout = new FixedBatchSpout(fields, 4,new Values("storm", "1"),new Values("trident", "1"),new Values("needs", "1"),new Values("javadoc", "1"));spout.setCycle(true);builder.setSpout("spout", spout, 5);KafkaBolt bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("test")).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");Config conf = new Config();//set producer properties.Properties props = new Properties();props.put("metadata.broker.list", "localhost:9092");props.put("request.required.acks", "1");props.put("serializer.class", "kafka.serializer.StringEncoder");conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());For Trident:
Fields fields = new Fields("word", "count");FixedBatchSpout spout = new FixedBatchSpout(fields, 4,new Values("storm", "1"),new Values("trident", "1"),new Values("needs", "1"),new Values("javadoc", "1"));spout.setCycle(true);TridentTopology topology = new TridentTopology();Stream stream = topology.newStream("spout1", spout);TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory().withKafkaTopicSelector(new DefaultTopicSelector("test")).withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());Config conf = new Config();//set producer properties.Properties props = new Properties();props.put("metadata.broker.list", "localhost:9092");props.put("request.required.acks", "1");props.put("serializer.class", "kafka.serializer.StringEncoder");conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());二、完整示例
(一)簡介
1、本項目主要完成以下功能:
(1)從kafka中讀取一個topic的消息,然后根據空格拆分單詞,最后統計數據后寫入一個HazelCastState(一個分布式的內存存儲框架)。
(2)通過DRPC從上述的HazelCastState中讀取結果,并將結果輸出。
2、代碼可分為3部分:
(1)單詞拆分
(2)定義拓撲行為
(3)state定義
以下分為三部分分別介紹。
(二)單詞拆分
原理很簡單,就是通過空格將單詞進行拆分。
public class WordSplit extends BaseFunction {public void execute(TridentTuple tuple, TridentCollector collector) {String sentence = (String) tuple.getValue(0);if (sentence != null) {sentence = sentence.replaceAll("\r", "");sentence = sentence.replaceAll("\n", "");for (String word : sentence.split(" ")) {collector.emit(new Values(word));}}} }這里的wordsplit是一個function,它繼承自BaseFunction,最后,它將拆分出來的單詞逐個emit出去。
(三)定義拓撲行為
1、定義kafka的相關配置
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "storm-sentence", "storm");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);(1)首先定義一個kafka相關的配置對象,第一個參數是zookeeper的位置,第二個參數是訂閱topic的名稱,第三個參數是一個clientId
(2)然后對配置進行一些設置,包括一些起始位置之類的,后面再補充具體的配置介紹。
(3)創建一個spout,這里的spout是事務型的,也就是保證每一個僅且只被處理一個
2、定義拓撲,進行單詞統計后,寫入一個分布式內存中
TridentTopology topology= new TridentTopology(); TridentState wordCounts = topology.newStream("kafka", kafkaSpout).shuffle().each(new Fields("str"), new WordSplit(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new HazelCastStateFactory(), new Count(), new Fields("aggregates_words")).parallelismHint(2);(1)創建一個topo。
(2)首先定義一個輸入流,其中第一個參數定義了zk中放置這個topo元信息的信息,一般是/transactional/kafka
(3)對每個輸入的消息進行拆分:首先它的輸入是字段名稱為str的消息,然后經過WordSplit這個Function處理,最后,以字段名稱word發送出去
(4)將結果根據word字段的值進行分組,就是說word值相同的放在一起。
(5)將分組的結果分別count一下,然后以字段名稱aggregates_words寫入HazelCastStateFactory定義的state中,關于state請見下一部分的介紹。
3、從分布式內存中讀取結果并進行輸出
topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields("word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));(1)第三行定義了使用drpc需要處理的內容
(2)查詢分布式內存中的內容,查詢字段為word,然后以字段名count發送出去。
(3)將不需要統計的過濾掉。
(4)將結果進行聚合。
4、主函數
String kafkaZk = args[0];SentenceAggregationTopology sentenceAggregationTopology = new SentenceAggregationTopology(kafkaZk);Config config = new Config();config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);if (args != null && args.length > 1) {String name = args[1];String dockerIp = args[2];config.setNumWorkers(2);config.setMaxTaskParallelism(5);config.put(Config.NIMBUS_HOST, dockerIp);config.put(Config.NIMBUS_THRIFT_PORT, 6627);config.put(Config.STORM_ZOOKEEPER_PORT, 2181);config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(dockerIp));StormSubmitter.submitTopology(name, config, sentenceAggregationTopology.buildTopology());} else {LocalDRPC drpc = new LocalDRPC();config.setNumWorkers(2);config.setMaxTaskParallelism(2);LocalCluster cluster = new LocalCluster();cluster.submitTopology("kafka", config, sentenceAggregationTopology.buildTopology(drpc));while (true) {System.out.println("Word count: " + drpc.execute("words", "the"));Utils.sleep(1000);}}三個參數的含義為:
/*args[0]:kafkazk,如: 192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181,192.168.172.117:2181/kafka* args[1]:topo名稱* args[2]:niubus節點,如,192.168.172.98*/當參數數據大于1時,將拓撲提交到集群中,否則提交到本地。提交拓撲到集群的比較直觀,下面鄭重介紹一下drpc的查詢。
(1)首先定義一個本地的drpc對象,以及一個本地storm集群。
(2)然后將拓撲群提交到本地集群。
(3)最后,使用drpuc不停的循環查詢統計結果并輸出。
注意上面的拓撲定義了2個流,第一個流用于接收kafka消息,然后拆分統計后寫入內存,第二個流則接受drpc的輸入,將drpc的輸入拆分后,再統計需要查詢的每個單詞的統計結果。如在本例中,需要顯示單詞the的數量。
在本例中,drpc和kafka沒有本質的區別,它們都是一個用于向storm發送消息的集群,只是輸入數據的方式有些不同,kafka通過spout輸入,drpc則直接通過execute()進行輸入。
運行方式:
方式一:直接在eclipse右鍵運行,參數只填一個,如
192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181,192.168.172.117:2181/kafka。
只要保證kafka集群中有對應的topic,則會得到以下輸出:
Word count: [[2]] Word count: [[5]] Word count: [[10]] Word count: [[17]] Word count: [[28]]當然,統計結果根據輸入kafka的內容而不同。
(四)state定義
在定義拓撲的時候,最終的wordcount結果寫在了HazelCastState中:
persistentAggregate(new HazelCastStateFactory(),new Count(),new Fields(“aggregates_words”))
下面我們分析一下如何使用state來保存topo的處理結果,或者是中間處理結果。
注意,使用state除了可以保存最終的結果輸出,以保證事務型、透明事務型以外,還經常用于保存中間結果。比如blueprint第3章的一個例子中,用于統計疾病的發生數量,如果超過預警值,則向外發信息。如果統計結果成功,但向外發送信息失敗,則spout會重發數據,導致統計結果有誤,因此,此時可以通過state將結果保存下來。
1、Factory類
public class HazelCastStateFactory implements StateFactory {@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {return TransactionalMap.build(new HazelCastState(new HazelCastHandler()));} }內容很簡單,就是返回一個state,它也是三個state相關的類中唯一對外的接口。
2、Handler類
public class HazelCastHandler implements Serializable {private transient Map<String, Long> state;public Map<String, Long> getState() {if (state == null) {state = Hazelcast.newHazelcastInstance().getMap("state");}return state;}使用單例模式返回一個map。
3、State類
真正處理業務邏輯的類。主要的方法有mutiPut和mutiGet,用于將結果放入state與取出state。
public class HazelCastState<T> implements IBackingMap<TransactionalValue<Long>> {private HazelCastHandler handler;public HazelCastState(HazelCastHandler handler) {this.handler = handler;}public void addKeyValue(String key, Long value) {Map<String, Long> state = handler.getState();state.put(key, value);}@Overridepublic String toString() {return handler.getState().toString();}@Overridepublic void multiPut(List<List<Object>> keys, List<TransactionalValue<Long>> vals) {for (int i = 0; i < keys.size(); i++) {TridentTuple key = (TridentTuple) keys.get(i);Long value = vals.get(i).getVal();addKeyValue(key.getString(0), value);//System.out.println("[" + key.getString(0) + " - " + value + "]");}}public List multiGet(List<List<Object>> keys) {List<TransactionalValue<Long>> result = new ArrayList<TransactionalValue<Long>>(keys.size());for (int i = 0; i < keys.size(); i++) {TridentTuple key = (TridentTuple) keys.get(i);result.add(new TransactionalValue<Long>(0L, MapUtils.getLong(handler.getState(), key.getString(0), 0L)));}return result;} }三、異常
1、storm.kafka.UpdateOffsetException
請先參考文章《關于kafka中的timestamp與offset的對應關系》
在代碼中指定從哪個時間點開始消費消息時出現以上異常:
kafkaConfig.startOffsetTime = new SimpleDateFormat("yyyy.MM.dd-HH:mm:ss").parse(startOffsetTime).getTime();原因很簡單,指定的時間太早了,kafka集群中沒有任何數據,所以拋出異常。而在storm 0.10以上版本修復了以上的問題,改為從最早的消息開始讀取,見https://issues.apache.org/jira/browse/STORM-586。
但為什么會出現這個錯誤呢?不是說kafka集群中沒有這個時間點的消息,而很有可能只是其中一個分區的消息很少導致的,而且這個分區中是有這個時間點的消息的,但由于kafka中處理時間點與offset的機制會導致上面的異常,詳見上述文章。
遇見這種異常時,看一下各個分區中的文件的最后修改時間,如果都晚于你指定的時間,則必然后出現這種異常。
如果實際數據是很少,那沒辦法,你從頭讀取吧。
如果實際數據是很多的,那很可以是在各個分區中分布不均所導致的,這需要均衡各個分區中的消息數量,詳見上面的文章。
2、storm Unsupported major.minor version 51.0
出現這個異常都是由于編譯時使用的jdk版本和運行環境中的jdk版本不一致。
(1)編譯環境
常用的編譯環境有eclipse, ant, maven等。
使用eclipse的話,改一下java compiler即可。
使用maven的話,修改$MAVEN_HOME/conf/settings.xml中的jdk設置。默認情況下是注釋掉的,把注釋去掉,并且改為1.7版本即可。
(2)storm運行環境
修改$STORM_HOME/conf/storm_evn.ini中的
JAVA_HOME,如
參數,
JAVA_HOME:/usr/lib/jvm/java-7-sun
3、kafka啟動后一段時間storm處理數據失敗
storm讀取kafka的信息進行處理,但處理一段時間后,拓撲卡住不動,查找storm的日志發現以下異常:
2016-09-19 10:52:08.688 k.c.ClientUtils$ [WARN] Fetching topic metadata with correlation id 13439 for topics [Set(streaming_g4_sdc)] from broker [id:2,host:ip,port:9092] failed java.net.SocketTimeoutExceptionat sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) ~[?:1.7.0_67]at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.7.0_67]at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.7.0_67]at kafka.utils.Utils$.read(Utils.scala:380) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.network.Receive$class.readCompletely(Transmission.scala:56) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.producer.SyncProducer.send(SyncProducer.scala:113) ~[kafka_2.10-0.8.2.1.jar:?]at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) [kafka_2.10-0.8.2.1.jar:?]at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) [kafka_2.10-0.8.2.1.jar:?]at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) [kafka_2.10-0.8.2.1.jar:?]at kafka.utils.Utils$.swallow(Utils.scala:172) [kafka_2.10-0.8.2.1.jar:?]at kafka.utils.Logging$class.swallowError(Logging.scala:106) [kafka_2.10-0.8.2.1.jar:?]at kafka.utils.Utils$.swallowError(Utils.scala:45) [kafka_2.10-0.8.2.1.jar:?]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) [kafka_2.10-0.8.2.1.jar:?]at kafka.producer.Producer.send(Producer.scala:77) [kafka_2.10-0.8.2.1.jar:?]at kafka.javaapi.producer.Producer.send(Producer.scala:33) [kafka_2.10-0.8.2.1.jar:?]at com.netease.sytopology.bolt.FilterFunction.backToKafka(FilterFunction.java:438) [stormjar.jar:?]at com.netease.sytopology.bolt.FilterFunction.filter(FilterFunction.java:180) [stormjar.jar:?]at com.netease.sytopology.bolt.FilterFunction.execute(FilterFunction.java:458) [stormjar.jar:?]at storm.trident.planner.processor.EachProcessor.execute(EachProcessor.java:65) [storm-core-0.10.0.jar:0.10.0]at storm.trident.planner.SubtopologyBolt$InitialReceiver.receive(SubtopologyBolt.java:206) [storm-core-0.10.0.jar:0.10.0]at storm.trident.planner.SubtopologyBolt.execute(SubtopologyBolt.java:146) [storm-core-0.10.0.jar:0.10.0]at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.daemon.executor$fn__5694$tuple_action_fn__5696.invoke(executor.clj:690) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.daemon.executor$mk_task_receiver$fn__5615.invoke(executor.clj:436) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.disruptor$clojure_handler$reify__5189.onEvent(disruptor.clj:58) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:132) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.daemon.executor$fn__5694$fn__5707$fn__5758.invoke(executor.clj:819) [storm-core-0.10.0.jar:0.10.0]at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.0.jar:0.10.0]at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]at java.lang.Thread.run(Thread.java:745) [?:1.7.0_67]以上表明storm從kafka中讀取數據出錯了,檢查過很多因素,包括IO/網絡/CPU/內存都沒有明顯異常,kafka也正常運行,之后分析kafka機器的GC狀態:
hadoop@gdc-kafka08-log:~/kafka/logs$ jstat -gcutil `jps | grep Kafka | awk '{print $1}'` 1000 1000S0 S1 E O P YGC YGCT FGC FGCT GCT11.16 9.75 100.00 1.76 65.54 11 342.167 2 0.179 342.34611.16 9.75 100.00 1.76 65.54 11 342.167 2 0.179 342.34611.16 9.75 100.00 1.76 65.54 11 342.167 2 0.179 342.34611.16 9.75 100.00 1.76 65.54 11 342.167 2 0.179 342.34611.16 9.75 100.00 1.76 65.54 11 342.167 2 0.179 342.34611.16 9.75 100.00 1.76 65.54 11 342.167 2 0.179 342.34611.16 9.75 100.00 1.76 65.54 11 342.167 2 0.179 342.3460.00 9.79 9.13 1.75 65.54 11 434.141 2 0.179 434.3200.00 9.79 17.23 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 17.28 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 17.36 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 17.37 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 17.53 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 17.57 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 17.62 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 17.79 1.75 65.59 11 434.141 2 0.179 434.3200.00 9.79 18.08 1.75 65.60 11 434.141 2 0.179 434.3200.00 9.79 18.22 1.75 65.60 11 434.141 2 0.179 434.3200.00 9.79 18.32 1.75 65.60 11 434.141 2 0.179 434.3200.00 9.79 18.36 1.75 65.60 11 434.141 2 0.179 434.320發現young GC時間超過100秒了。這段時間由于kafka沒有向zk匯報,zk會認為kafka已經掛掉,從而進行leader切換。但在這個時間內storm讀取kafka信息出錯,從而導致出現以上錯誤。
解決方法:使用G1 GC。
4、storm拓撲啟動一段時間后卡住沒反應
拓撲啟動一段時間內卡住不動,沒有明顯異常,沒有網絡/IO/內存/CPU/GC等異常。后查看網絡端口狀態,發現以下內容
$ netstat -at tcp 0 123681 gdc-kafka06-log.i.:9092 gdc-storm13-storm:55069 ESTABLISHED發現9092端口的TCP緩沖區被占滿,從而無法再處理新數據。
原因分析:可能某些producer沒有釋放?不確認
解決辦法:重啟kafka
總結
以上是生活随笔為你收集整理的storm-kafka编程指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: storm集群操作指南
- 下一篇: storm-kafka源码分析