storm-kafka源码分析
storm-kafka源碼分析
@(KAFKA)[kafka, 大數據, storm]
- storm-kafka源碼分析
- 一概述
- 一代碼結構
- 二orgapachestormkafka
- 三orgapachestormkafkatrident
- 1spout
- 2state
- 3metric
- 四其它說明
- 1線程與分區
- 二orgapachestormkafka
- 一基礎類
- 1Broker
- 2BrokerHosts
- 3Partition
- 4tridentGlobalPartitionInformation
- 5KafkaConfig
- 6SpoutConfig
- 7ZkState
- 8DynamicBrokersReader
- 9tridentZkBrokerReader
- 10ZkCoordinator
- 11PartitionManager
- 12DynamicPartitionConnections
- 13KafkaUtils
- 1calculatePartitionsForTask
- 二KafkaSpout
- 1open
- 2nextTuple
- 一基礎類
- 三trident
- 一tridentspout的主要流程
- 1主要調用流程回顧
- 2指定spout
- 二Coordinator
- 1Coordinator的實例化
- 2close與isReady
- 3getPartitionsForBatch
- 三Emitter TridentKafkaEmitter結構
- 1offset與nextOffset
- 1事務型的spout
- emitPartitionBatchNew
- emitPartitionBatch
- 2透明型的spout
- emitPartitionBatch
- 3公共方法
- 四透明型spout
- 1emitPartitionBatch
- 2emitNewPartitionBatch
- TODOImmutableMapof
- TODO如果獲取失敗哪里更新了新的分區信息是fetch里面作了處理嗎后面再看
- 3failFastEmitNewPartitionBatch
- 4doEmitNewPartitionBatch
- 1確定offset
- 2讀取消息
- 3發送消息并更新offset
- 4構建下一個meta并返回
- 五事務型spout
- 1emitPartitionBatchNew
- 2emitPartitionBatch
- 3reEmitPartitionBatch
- 六2種spout的公共方法
- 1refreshPartitions
- 2getOrderedPartitions
- 3close
- 4Partitions與Partition
- 七fetch消息的邏輯
- 1fetchMessages
- 2KafkaUtilfetchMessages
- 八KafkaOffsetMetric
- TODO還有其它metric吧
- 一tridentspout的主要流程
一、概述
storm-kafka是storm用于讀取kafka消息的連接器,本文主要對trident的實現部分作了解讀。
(一)代碼結構
storm-kafka中多7個package中,其中的org.apache.storm.kafka與org.apache.storm.kafka.trident中最核心的2個,分別用于處理storm-core與trident,其它package只是這2個的輔助。我們下面分別先簡單看一下這2個package的內容。
注:還有一個包org.apache.storm.kafka.bolt用于向kafka寫入數據,用得較少,暫不分析。
(二)org.apache.storm.kafka
org.apache.storm.kafka這個package包括了一些公共模塊,以及storm-core的spout處理。
(三)org.apache.storm.kafka.trident
trident這個package中的類按照其功能可大致分為3類:spout, state和metric。除此之外,trident還調用了一些org.apache.storm.kafka中的類用于處理相同的事務,如metric, exception, DynamicBrokerReader等
1、spout
spout指定了如何從kafka中讀取消息,根據trident的構架,它涉及的主要類為:
* OpaqueTridentKafkaSpout, TransactionalTridentKafkaSpout: 2種類型的spout
* Coordinator, TridentKafkaEmitter: 即Coordinator與Emitter的具體實現。
* GlobalPartitionInformation, ZkBrokerReader:2個重要的輔助類,分別記錄了partition的信息以及如何從zk中讀取kafka的狀態(還有一個靜態指定的,這里不分析)。
2、state
3、metric
主要涉及一個類:MaxMetric,其實還有其它metric,但在org.apache.storm.kafka中定義了。
(四)其它說明
1、線程與分區
注意,storm-kafka中的spout只是其中一個線程。
嚴格來說是每個partition只能由一個task負責,當然,一個task可以處理多個partition。但task和partition之間是怎么對應的呢?如何決定一個task處理哪些partition?
在trident拓撲中,多個batch會同時被處理(由MAX_SPOUT_PENDING決定),每個batch包含多個或者全部分區,每個batch讀取的消息大小由fetchSizeBytes決定。
二、org.apache.storm.kafka
(一)基礎類
這些基礎的功能類可以大致分為以下幾類:
* Bean類:表示某一種實體,包括Broker,BrokerHost, Partition 和trident.GlobalPartitionInformation
* 配置類: 包括KafkaConfig 和 SpoutConfig。
* zk讀寫類:包括獲取state內容的ZkState,以及讀取broker信息的DynamicBrokersReader和trident.ZkBrokerReader。
* 數據處理類:ZkCoordinator用于確定自已這個spout要處理哪些分區,以及某個分區對于的PartitionManager對象,而PartitionManager則真正的對某個分區進行處理了,DynamicPartitionConnections用于被PartitionManager調用以獲取分區對應的SimpleConsumer,
* KafkaUtils: 一些功能方法。
另外還有一些metric和錯誤處理的類等,暫不介紹。
1、Broker
Broker只有2個變量:
public String host; public int port;表示一臺kafka機器的地址與端口。
2、BrokerHosts
有2種實現:StaticHosts 與 ZkHost。
以ZkHost為例:
可以看出,這是記錄了kafka在zk中的位置(ip與路徑),以及多久刷新一下這個信息。默認為/kafka/brokers,有2個子目錄:
topic ids分別記錄了topic信息及broker信息。
3、Partition
Partition記錄了一個分區的具體信息,包括(所在的broker, 所屬的topic,partition號)。
Partition(Broker host, String topic, int partition)4、trident.GlobalPartitionInformation
GlobalPartitionInformation記錄的是某個topic的所有分區信息,其中分區信息以一個TreeMap的形式來保存。
public String topic; private Map<Integer, Broker> partitionMap;它有一個getOrderedPartitions()方法,返回的就是這個topic的所有分區信息:
public List<Partition> getOrderedPartitions() {List<Partition> partitions = new LinkedList<Partition>();for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId));}return partitions; }注意,因為使用了TreeMap的數據結構,因此返回的結果就是有序的。
5、KafkaConfig
就是關于kafkaSpout的一些配置項,完整列表為:
public final BrokerHosts hosts; public final String topic; public final String clientId;public int fetchSizeBytes = 1024 * 1024; public int socketTimeoutMs = 10000; public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60;6、SpoutConfig
SpoutConfig extends KafkaConfig加了幾個配置項:
public List<String> zkServers = null; public Integer zkPort = null; public String zkRoot = null; public String id = null;public String outputStreamId;// setting for how often to save the current kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000;// Exponential back-off retry settings. These are used when retrying messages after a bolt // calls OutputCollector.fail(). public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000;7、ZkState
ZkState記錄了每個partition的處理情況,它是通過讀寫zk來實現的,zk中的內容如下:
{"topology":{"id":"2e3226e2-ef45-4c53-b03f-aacd94068bc9","name":"ljhtest"},"offset":8066973,"partition":0,"broker":{"host":"gdc-kafka08-log.i.nease.net","port":9092},"topic":"ma30"}上面的信息分別為topoId,拓撲名稱,這個分區處理到的offset,分區號,這個分區在哪臺kafka機器,哪個端口,以及topic名稱。
ZkState只要提供了對這個zk信息的讀寫操作,如readJSON, writeJSON。
這些信息在zk中的位置通過構建KafkaConfig對象時的第3、4個參數指定,如下面的配置,則數據被寫在/kafka2/ljhtest下面。因此第4個參數必須唯一,否則不同拓撲會有沖突。
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/kafka2", "ljhtest");而trident的默認位置為/transactional/${topo}
8、DynamicBrokersReader
讀取zk中關于kafka的信息,如topic的分區等。
public List<GlobalPartitionInformation> getBrokerInfo()獲取所有topic的分區信息。
private int getNumPartitions(String topic)獲取某個topic的分區數量。
9、trident.ZkBrokerReader
trident.ZkBrokerReader大部分功能通過DynamicBrokersReader完成,關于與zk的連接,都是通過前者完成。同時增加了以下2個方法:
- getBrokerForTopic():返回某個topic的分區信息,返回的是GlobalPartitionInformation對象。這是由于可能同時讀取多個分區的情況。
- getAllBrokers():讀取所有的分區,不指定topic。因為支持正則topic,所以有可能有多個topic。
- refresh(): 這是一個private方法,每隔一段時間去refresh分區信息,在上面2個方法中被調用。
每次發送一個新的batch時,會通過DynamicPartitionConnections#register()方法調用上面的方法,當時間超過refreshFreqSecs時,即會刷新分區信息。
10、ZkCoordinator
ZkCoordinator implements PartitionCoordinator與之對應的還有個StaticCoordinator。
主要功能是讀取zk中的分區信息,然后計算自己這個task負責哪些分區。
PartitionCoordinator只有3個方法:
(1)主要方法為getMyManagedPartitions(),即計算自己這個spout應該處理哪些分區。
還有refresh是去刷新分區信息的。
(2)獲取PartitionManager對象:
PartitionManager getManager(Partition partition);(3)定期刷新分區信息
void refresh();11、PartitionManager
記錄了某個分區的連接信息,如:
Long _committedTo; LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>(); Partition _partition; SpoutConfig _spoutConfig; String _topologyInstanceId; SimpleConsumer _consumer; DynamicPartitionConnections _connections; ZkState _state;即這個分區的分區號,consumer等信息,還有用于發送消息的next()方法等,反正對某個分區的處理都在這個類中。
2個重點方法:
* fill()用于從kafka中獲取消息,寫到_waitingToEmit這個列表中。
* next()從上面準備的列表中讀取數據,通過emit()發送出去。
* 還有ack(),fail等方法。
PartitionManager持有一個DynamicPartitionConnections對象,通過這個對象的regist方法可以獲取到一個SimpleConsumer對象,從而對消息進行讀取。
12、DynamicPartitionConnections
DynamicPartitionConnections用于記錄broker—SimpleConsumber—-分區之間的關系。* 一個broker對應一個SimpleConsumber,但一個SimpleConsumer可以對應多個分區。尤其是spout的數量比分區數量少的時候*
主要用于創建SimpleConsumer,通過Partition信息,返回一個SimpleConsumer對象:
public SimpleConsumer register(Partition partition) {...}以及unRegister()方法,取消關聯。
Map<Broker, ConnectionInfo> _connections = new HashMap();這個變量記錄了一個broker的連接信息,其中ConnectionInfo有2個成員變量:
static class ConnectionInfo {SimpleConsumer consumer;Set<String> partitions = new HashSet<String>();public ConnectionInfo(SimpleConsumer consumer) {this.consumer = consumer;} }因此一個broker對應一個ConnectionInfo對象,而ConnectionInfo對象內有一個SimpleConsumber對象和其對應的多個分區。
13、KafkaUtils
很多公用方法,以后一個一個解釋:
(1)calculatePartitionsForTask
public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {計算某個task負責哪些分區。
注意,tridentSpout并未使用這個方法計算所負責的分區。TridentSpout的分區計算不在storm-kafka中實現,而是Trident機制自帶的。詳細的說是在OpaquePartitionedTridentSpoutExecutor的emitBatch()方法中計算。這就有個問題了,為什么在trident中,會自己計算負責的分區,而一般的storm需要自己來實現。
(二)KafkaSpout
在用戶代碼中,用戶通過使用KafKaConfig對象創建一個KafkaSpout,這是整個拓撲的起點:
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "ma30", "/test2", "ljhtest");kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());TopologyBuilder builder = new TopologyBuilder();builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);KafkaSpout繼承自BaseRichSpout,有open(), nextTuple(), ack(), fail()等方法。
下面我們詳細分析一下KafkaSpout這個類。
1、open()
KafkaSpout完成初始化的方法,當一個spout 被創建時,這個方法被調用。這個方法主要完成了以下幾個對象的初始化:
* _state : 獲取state目錄下的內容,詳見ZkState中的介紹。
* _connection:用于在每次發送消息(nextTuple方法法)時,獲取某個分區的SimpleConsumer對象。
* _coordinator:用于在每次必發送消息時獲取這個spout要處理哪些分區。
此外還有2個metric。
2、nextTuple()
//獲取這個task要處理哪些分區,然后對每個分區數據開始處理List<PartitionManager> managers = _coordinator.getMyManagedPartitions();for (int i = 0; i < managers.size(); i++) {// in case the number of managers decreased_currPartitionIndex = _currPartitionIndex % managers.size();//發送消息,下面慢慢分析。mitState state = managers.get(_currPartitionIndex).next(_collector);}只要就2個步驟:
* 獲取到這個spout要處理哪些分區
* 然后遍歷分區,對消息進行處理,處理的過程在ParitionManage中,稍后再詳細介紹。
三、trident
OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout
(一)tridentspout的主要流程
1、主要調用流程回顧
先說明一下,一個spout的組成分成三個部分,簡單的說就是消息是從MasterBatchCoordinator開始的,它是一個真正的spout,而TridentSpoutCoordinator與TridentSpoutExecutor都是bolt,MasterBatchCoordinator發起協調消息,最后的結果是TridentSpoutExecutor發送業務消息。而發送協調消息與業務消息的都是調用用戶Spout中BatchCoordinator與Emitter中定義的代碼。
MaterBatchCorodeinator —————> ITridentSpout.Coordinator#isReady
|
|
v
TridentSpoutCoordinator —————> ITridentSpout.Coordinator#[initialTransaction, success, close]
|
|
v
TridentSpoutExecutor —————> ITridentSpout.Emitter#(emitBatch, success(),close)
對于分區是OpaquePartitionedTridentSpoutExecutor等
如果需要詳細了解這個過程,可參考:
http://blog.csdn.net/lujinhong2/article/details/49785077
我們先簡單介紹一下所有的相關類及其位置,然后分別介紹Coordinator與Emitter的實現。尤其是著重分析一下Emitter部分,因為它是實際讀取kafka消息,并向下游發送的過程。
2、指定spout
用戶在代碼中用以下語句指定使用哪個spout,如:
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);然后storm根據這個spout的代碼,找到對應的Coordinator與Emitter。我們看一下OpaqueTridentKafkaSpout的代碼。
這代碼很簡單,主要完成了:
(1)初始化一個Spout時,會要求傳遞一個TridentKafkaConfig的參數,指定一些配置參數。
(2)然后就分別指定了Coordinator與Emitter:
@Override public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> getEmitter(Map conf, TopologyContext context) {return new TridentKafkaEmitter(conf, context, _config, context.getStormId()).asOpaqueEmitter(); }@Override public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {return new org.apache.storm.kafka.trident.Coordinator(conf, _config); }(二)Coordinator
1、Coordinator的實例化
public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {config = tridentKafkaConfig;reader = KafkaUtils.makeBrokerReader(conf, config); }2、close()與isReady()
Coordinator通過TridentKafkaConfig傳入一個DefaultCoordinator的對象,Coordinator的close()及isReady()均是通過調用DefaultCoordinator的實現來完成的。
@Override public void close() {config.coordinator.close(); }@Override public boolean isReady(long txid) {return config.coordinator.isReady(txid); }我們接著看一下DefaultCoordinator的實現:
@Override public boolean isReady(long txid) {return true; }@Override public void close() { }很簡單,isReady()直接返回true,close()則不做任何事情。
3、getPartitionsForBatch()
這個方法的功能是在初始化一個事務時,去zk讀取最新的分區信息(當然是緩存超時后才讀)。
@Override public List<GlobalPartitionInformation> getPartitionsForBatch() {return reader.getAllBrokers(); }注釋為:
Return the partitions currently in the source of data. The idea is is that if a new partition is added and a prior transaction is replayed, it doesn’t emit tuples for the new partition because it knows what partitions were in that transaction.
由下面可以看出,getPartitionsForBatch()都是在初始化一個事務時被調用的。
透明型:
事務型:
@Overridepublic Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {if(currMetadata!=null) {return currMetadata;} else {return _coordinator.getPartitionsForBatch(); }}那我們繼續看看這個方法完成了什么功能:
@Override public List<GlobalPartitionInformation> getAllBrokers() {refresh();return cachedBrokers; }除了這個,還有一個使用靜態指定的,暫不管它。
private void refresh() {long currTime = System.currentTimeMillis();if (currTime > lastRefreshTimeMs + refreshMillis) {try {LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");cachedBrokers = reader.getBrokerInfo();lastRefreshTimeMs = currTime;} catch (java.net.SocketTimeoutException e) {LOG.warn("Failed to update brokers", e);}} }其它就是在超時的情況下去zk讀取broker的信息,并返回partitions的信息。返回的信息為GlobalPartitionInformation列表,即topic與其具體分區信息的map。
public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {List<String> topics = getTopics();List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();for (String topic : topics) {GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);try {int numPartitionsForTopic = getNumPartitions(topic);String brokerInfoPath = brokerPath();for (int partition = 0; partition < numPartitionsForTopic; partition++) {int leader = getLeaderFor(topic,partition);String path = brokerInfoPath + "/" + leader;try {byte[] brokerData = _curator.getData().forPath(path);Broker hp = getBrokerHost(brokerData);globalPartitionInformation.addPartition(partition, hp);} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {LOG.error("Node {} does not exist ", path);}}} catch (SocketTimeoutException e) {throw e;} catch (Exception e) {throw new RuntimeException(e);}LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);partitions.add(globalPartitionInformation);}return partitions; }以下內容均是對emitter的介紹
注意,在trident中,每個task負責哪些分區是在storm-core中計算好的,因此在emitter中只負責處理這個分區的消息就行了,具體來說是在OpaquePartitionedTridentSpoutExecutor.emitBatch()中計算分區的
(三)Emitter : TridentKafkaEmitter結構
TridentKafkaEmitter中有2個內部類,分別對應事務型與透明型的spout。事務型的spout重發batch時必須與上一批次相同,而透明型是沒這個需要的,可以從其它可能的分區中取一批新的數據。
1、offset與nextOffset
消息處理的metaData中保存了offset與nextOffset2個數據,其中后者一般通過MessageAndOffset#nextOffset()來獲取到。offset表示當前正在處理的消息的offset,nextOffset表示當前消息的下一個offset。舉個例子:
(offset)*這是一批消息**(nextOffset)
因此正常情況下,應該offset
1、事務型的spout
有5個方法,我們這里先討論其中2個核心方法。storm根據某個batch是否第一次發送來決定調用哪個方法。
emitPartitionBatchNew()
當某個batch是第一次發送時,調用此方法,這個方法的調用順序為:
emitPartitionBatchNew() —-> failFastEmitNewPartitionBatch() —–> doEmitNewPartitionBatch()
emitPartitionBatch()
當某個batch是重發時,調用此方法,這個方法的調用順序為:
emitPartitionBatch() —–> reEmitPartitionBatch()
2、透明型的spout
透明型的spout不需要保證重發的batch與上一批次是相同的,因此,對于每一次發送都是相同的邏輯即可,不需要管是否第一次發送,它只有一個發送方法。
emitPartitionBatch()
emitPartitionBatch() —–> emitNewPartitionBatch() —-> failFastEmitNewPartitionBatch() —–> doEmitNewPartitionBatch()
2種類型發送數據時只終均是調用doEmitNewPartitionBatch(),而透明型的spout在調用之前會先使用emitNewPartitionBatch()來捕獲FailedFetchException,重新獲取一份新的元數據,以準備讀取新的消息
3、公共方法
除了以上的發送數據方法以外,它們均還有以下3個方法,下面再詳細分析。
@Overridepublic void refreshPartitions(List<Partition> partitions) {refresh(partitions);}@Overridepublic List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {return orderPartitions(partitionInformation);}@Overridepublic void close() {clear();}(四)透明型spout
1、emitPartitionBatch()
/*** Emit a batch of tuples for a partition/transaction.** Return the metadata describing this batch that will be used as lastPartitionMeta* for defining the parameters of the next batch.*/@Overridepublic Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);}當需要發送一個新的batch時,storm會調用emitPartitionBatch方法,此方法直接調用emitNewPartitionBatch。
參數說明:
* transactionAttempt,只有2個成員變量,即long _txId和int _attemptId,即記錄了當前的事務id及已經嘗試的次數。
* tridentCollector,就是用于發送消息的collector。
* partition,表示一個分區,可以理解為kafka的一個分區,有2個成員變量,分別為Broker host和int partition,即kafka的機器與分區id。
* map,用于記錄這個事務的元數據,詳細見后面分析。
2、emitNewPartitionBatch()
private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {try {return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);} catch (FailedFetchException e) {LOG.warn("Failed to fetch from partition " + partition);if (lastMeta == null) {return null;} else {Map ret = new HashMap();ret.put("offset", lastMeta.get("nextOffset"));ret.put("nextOffset", lastMeta.get("nextOffset"));ret.put("partition", partition.partition);ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));ret.put("topic", _config.topic);ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));return ret;}} }很明顯,也只是簡單調用failFastEmitNewPartitionBatch,但如果獲取消息失敗的話,則會創建一個新元數據。
如果lastMeta為null的話,則會直接返回null,則會從其它地方(如zk)進行初始化(郵見下面的分析);如果不為空,則根據lastMeta的值,根據一個新的元數據。元數據包括以下幾個字段:
* offset:下一個需要處理的offset
* nextOffset:由于未開始處理batch,所以offset與nextOffset都是同一個值。注意,如果正在處理一個batch,則offset是正在處理的batch的offset,而nextOffset則是下一個需要處理的offset。
* partition:就是哪個分區了
* broker:哪臺kafka機器以及端口
* topic:哪個kafka topic
* topology:拓撲的名稱與id。
TODO:ImmutableMap.of()
ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId)TODO:如果獲取失敗,哪里更新了新的分區信息,是fetch里面作了處理嗎?后面再看。
3、failFastEmitNewPartitionBatch()
private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {SimpleConsumer consumer = _connections.register(partition);Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);_kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));return ret; }先根據partition信息注冊一個consumer,注意這里的分區信息包括了機器、端口還有分區id等。然后就調用doEmitNewPartitionBatch執行實際事務,最后的是metric的使用。
4、doEmitNewPartitionBatch()
(1)確定offset
簡單的說,就是
* 如果lastMeta為空,則從其它地方(如zk)獲取offset;
* 否則,如果當前topoid與之前的不同(表示拓撲重啟過)而且ignoreZkOffsets為true,則從指定的offset開始;
* 如果當前topoid與之前的相同(表示在持續處理消息中),或者ignoreZkOffsets為false,則從之前的位置繼續處理
(2)讀取消息
ByteBufferMessageSet msgs = null;try {msgs = fetchMessages(consumer, partition, offset);} catch (TopicOffsetOutOfRangeException e) {long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);offset = newOffset;msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);}如果TopicOffsetOutOfRangeException,則從最舊的消息開始讀。
(3)發送消息并更新offset
long endoffset = offset;for (MessageAndOffset msg : msgs) {emit(collector, msg.message());endoffset = msg.nextOffset();}每發送一條消息則將endoffset往后移一位,直到發送完時,endoffset就是下一個需要處理的offset。
(4)構建下一個meta并返回
Map newMeta = new HashMap();newMeta.put("offset", offset);newMeta.put("nextOffset", endoffset);newMeta.put("instanceId", _topologyInstanceId);newMeta.put("partition", partition.partition);newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));newMeta.put("topic", _config.topic);newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));return newMeta;關于metric的設置以及讀取kafka消息的實現,下面單獨介紹
(五)事務型spout
1、emitPartitionBatchNew()
當某個batch第一次發送時調用此方法,返回是這個batch相關的元數據,可用于重構這個batch。如果這個batch出錯需要重發,則調用emitPartitionBatch(),下面再介紹。
/*** Emit a batch of tuples for a partition/transaction that's never been emitted before.* Return the metadata that can be used to reconstruct this partition/batch in the future.*/@Overridepublic Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);}與透明型不同的是,它沒有捕獲FailedFetchException這個異常,因此出現獲取消息失敗時,會一直等待某個分區恢復。其它處理邏輯與透明型相同,參考上面的介紹即可。
2、emitPartitionBatch()
/*** Emit a batch of tuples for a partition/transaction that has been emitted before, using* the metadata created when it was first emitted.*/@Overridepublic void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);}當一個batch之前已經發送過,但失敗了,則調用此方法重試。
3、reEmitPartitionBatch()
重試發送消息的主要實現,邏輯也相對簡單。
直接去fetch消息。如果消息不為空的話,則判斷offset:
* 如果offset與nextoffset相等,則表示消息已經處理完了
* 如果offset>nextOffset,則出錯了,拋出以下運行時異常:
最后發送消息,并更新nextOffset。
完整代碼如下:
(六)2種spout的公共方法
1、refreshPartitions()
根據注釋可知,當處理一些新的分區時,管理到這些分區的連接信息。
/*** This method is called when this task is responsible for a new set of partitions. Should be used* to manage things like connections to brokers.*/@Overridepublic void refreshPartitions(List<Partition> partitions) {refresh(partitions);}2、getOrderedPartitions()
getOrderedPartitions()方法會在分區元數據發生變化(即Partitions發生變化)時被調用。該方法與refreshPartitions()方法調用時機相同,用來應對分區的變化。例如,建立并維護與新增加Partitions的連接時就可以使用這個方法。
3、close()
看下面的實現,其實refreshPartitions()和close()都只是簡單的清空了連接,而getOrderedPartitions是獲取分區信息。
private void clear() {_connections.clear(); }private List<Partition> orderPartitions(GlobalPartitionInformation partitions) {return partitions.getOrderedPartitions(); }private void refresh(List<Partition> list) {_connections.clear();_kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list)); }4、Partitions與Partition
Partitions含義為分區的元數據,如一共存在多少個分區,分區所在的broker等,具體信息由用戶定義,不過這些信息一般是比較穩定的。在kafka中,是通過以下代碼指定的:
new ZkHosts(brokerHosts)看如何將zk中的信息導入Partitions的:
Partition則是某個具體的分區了。
在coordinator的getPartitionsForBatch()中指定。
(七)fetch消息的邏輯
_connection包括了一些連接信息,如broker,端口,分區id等,通過它可以獲取到一個simpleConsumer,下面重點分析這個獲取消息的過程。
msgs = fetchMessages(consumer, partition, offset);1、fetchMessages()
private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {long start = System.nanoTime();ByteBufferMessageSet msgs = null;msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);long end = System.nanoTime();long millis = (end - start) / 1000000;_kafkaMeanFetchLatencyMetric.update(millis);_kafkaMaxFetchLatencyMetric.update(millis);return msgs; }主要調用 KafkaUtils.fetchMessages(_config, consumer, partition, offset);其余代碼用于更新metric,統計獲取消息的平均時長以及最大時長。
2、KafkaUtil.fetchMessages()
邏輯很簡單,構建一個FetchRequest,然后得到FetchResponse。此外就是一些處理異常的代碼了
public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException {ByteBufferMessageSet msgs = null;String topic = config.topic;int partitionId = partition.partition;FetchRequestBuilder builder = new FetchRequestBuilder();FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).clientId(config.clientId).maxWait(config.fetchMaxWait).build();FetchResponse fetchResponse;try {fetchResponse = consumer.fetch(fetchRequest);} catch (Exception e) {if (e instanceof ConnectException ||e instanceof SocketTimeoutException ||e instanceof IOException ||e instanceof UnresolvedAddressException) {LOG.warn("Network error when fetching messages:", e);throw new FailedFetchException(e);} else {throw new RuntimeException(e);}}if (fetchResponse.hasError()) {KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {String msg = "Got fetch request with offset out of range: [" + offset + "]";LOG.warn(msg);throw new TopicOffsetOutOfRangeException(msg);} else {String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";LOG.error(message);throw new FailedFetchException(message);}} else {msgs = fetchResponse.messageSet(topic, partitionId);}return msgs; }(八)KafkaOffsetMetric
TODO:還有其它metric吧
storm-kafka中定義了一個metric用來計算目前正在處理的offset與最新的offset之間有多少差距,即落后了多少條數據。
這個類定義在KafkaUtil中,主要有2個核心變量:
_partitionToOffset是一個hashMap,內容為(分區,正在處理的offset)
_partitions就是_partitionToOffset的key組成的一個集合。
這個metric只在2個地方被調用:
(1)第一次讀取一個分區時
(2)refresh時
_kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));refreshPartitions()時會調用refresh方法。This method is called when this task is responsible for a new set of partitions. Should be used to manage things like connections to brokers.
總結
以上是生活随笔為你收集整理的storm-kafka源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: storm-kafka编程指南
- 下一篇: kafka集群原理介绍