Kafka 客户端实现逻辑分析
這里主要分析kafka 客戶端實現 (代碼分析以perl kafka實現為準)
kafka客戶端分為生產者和消費者,生產者發送消息,消費者獲取消息.
在kafka協議里客戶端通信中用到的最多的四個協議命令是fetch,fetchoffset,send,metadata.這四個分別是獲取消息,獲取offset,發送消息,獲取metadata.剩下的其他協議命令大多都是kafka server內部通信用到的.offsetcommit這個命令在有些語言的client api的實現里給出了接口可以自己提交offset.但是在perl的實現里并沒有.
先看看直接producer和consumer的代碼
my $request = {ApiKey => $APIKEY_PRODUCE,CorrelationId => $self->{CorrelationId},ClientId => $self->{ClientId},RequiredAcks => $self->{RequiredAcks},Timeout => $self->{Timeout} * 1000,topics => [{TopicName => $topic,partitions => [{Partition => $partition,MessageSet => $MessageSet,},],},],};foreach my $message ( @$messages ) {push @$MessageSet, {Offset => $PRODUCER_ANY_OFFSET,Key => $key,Value => $message,};}return $self->{Connection}->receive_response_to_request( $request, $compression_codec );代碼并未完全貼上.核心代碼就這一部分.最后一行代碼可以看見最終調用connection::receive_response_to_request函數.再上面的部分是設置消息格式.和消息內容的數據結構.
my $request = {ApiKey => $APIKEY_FETCH,CorrelationId => $self->{CorrelationId},ClientId => $self->{ClientId},MaxWaitTime => $self->{MaxWaitTime},MinBytes => $self->{MinBytes},topics => [{TopicName => $topic,partitions => [{Partition => $partition,FetchOffset => $start_offset,MaxBytes => $max_size // $self->{MaxBytes},},],},],};my $response = $self->{Connection}->receive_response_to_request( $request );這是consumer的獲取消息的核心部分代碼.最后同producer一樣.代碼結構也相似.同樣是設置消息數據結構然后發送.只是最后多了代碼返回處理的部分.消息返回處理的部分就不再貼上詳細說明了.有興趣自行去cpan上看源代碼.
下面看看最核心的函數代碼.
sub receive_response_to_request {my ( $self, $request, $compression_codec ) = @_;local $Data::Dumper::Sortkeys = 1 if $self->debug_level;my $api_key = $request->{ApiKey}; //這里獲取請求類型,是發送消息,還是獲取消息和offset的.# WARNING: The current version of the module limited to the following: # supports queries with only one combination of topic + partition (first and only).my $topic_data = $request->{topics}->[0]; //這些消息具體處理就略過不提了.my $topic_name = $topic_data->{TopicName};my $partition = $topic_data->{partitions}->[0]->{Partition};if ( //這里是比較關鍵的.判斷是否有完整的metadata信息.沒有metadata信息就通過fetch meta命令獲取.!%{ $self->{_metadata} } # the first request|| ( !$self->{AutoCreateTopicsEnable} && defined( $topic_name ) && !exists( $self->{_metadata}->{ $topic_name } ) )) {//updata_metadata函數就是封裝了fetch metadata請求命令發送給kafka 來獲取metadata信息.在這個地方處理不同語言里處理邏輯多少有些差別.php-kafka中有兩種方式,一種通過這里的這個方法.另一種是通過zookeeper獲取meta信息.在使用的時候需要指定zookeeper地址.$self->_update_metadata( $topic_name ) # hash metadata could be updated# FATAL erroror $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic = '%s'", $topic_name ) );}my $encoded_request = $protocol{ $api_key }->{encode}->( $request, $compression_codec ); //這里將消息格式化成網絡字節序.my $CorrelationId = $request->{CorrelationId} // _get_CorrelationId;say STDERR sprintf( '[%s] compression_codec = %d, request: %s',scalar( localtime ),$compression_codec // '<undef>',Data::Dumper->Dump( [ $request ], [ 'request' ] )) if $self->debug_level;my $attempts = $self->{SEND_MAX_ATTEMPTS};my ( $ErrorCode, $partition_data, $server );ATTEMPTS:while ( $attempts-- ) { //在while里進行發送嘗試.java版客戶端的三次嘗試即是這里同樣的邏輯REQUEST:{$ErrorCode = $ERROR_NO_ERROR;
//這里差早topic分區對應的leader,成功則進行leader連接發送請求if ( defined( my $leader = $self->{_metadata}->{ $topic_name }->{ $partition }->{Leader} ) ) { # hash metadata could be updatedunless ( $server = $self->{_leaders}->{ $leader } ) { //沒有找到對應leader的server就跳過此次請求嘗試,更新metadata并進行下一次嘗試$ErrorCode = $ERROR_LEADER_NOT_FOUND;$self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );last REQUEST; # go to the next attempt //在這里跳出主邏輯塊.進行塊之后的動作.}# Send a request to the leaderif ( !$self->_connectIO( $server ) ) { //這里連接此topic分區的leader$ErrorCode = $ERROR_CANNOT_BIND;} elsif ( !$self->_sendIO( $server, $encoded_request ) ) { //這里向這個leader發送請求$ErrorCode = $ERROR_CANNOT_SEND;}if ( $ErrorCode != $ERROR_NO_ERROR ) { //判斷動作是否成功$self->_remember_nonfatal_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error}, $server, $topic_name, $partition );last REQUEST; # go to the next attempt}my $response; //這里處理返回情況.如果發送的produce請求并且沒有任何response返回.則構建一個空的response返回. if ( $api_key == $APIKEY_PRODUCE && $request->{RequiredAcks} == $NOT_SEND_ANY_RESPONSE ) {# Do not receive a response, self-forming own response$response = {CorrelationId => $CorrelationId,topics => [{TopicName => $topic_name,partitions => [{Partition => $partition,ErrorCode => 0,Offset => $BAD_OFFSET,},],},],};} else { //這里獲取response.并從網絡字節序轉換成字符格式.my $encoded_response_ref;unless ( $encoded_response_ref = $self->_receiveIO( $server ) ) {if ( $api_key == $APIKEY_PRODUCE ) { # WARNING: Unfortunately, the sent package (one or more messages) does not have a unique identifier # and there is no way to verify the delivery of data$ErrorCode = $ERROR_SEND_NO_ACK;# Should not be allowed to re-send data on the next attempt# FATAL error$self->_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error} );} else {$ErrorCode = $ERROR_CANNOT_RECV;$self->_remember_nonfatal_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error}, $server, $topic_name, $partition );last REQUEST; # go to the next attempt}}if ( length( $$encoded_response_ref ) > 4 ) { # MessageSize => int32$response = $protocol{ $api_key }->{decode}->( $encoded_response_ref );say STDERR sprintf( '[%s] response: %s',scalar( localtime ),Data::Dumper->Dump( [ $response ], [ 'response' ] )) if $self->debug_level;} else {$self->_error( $ERROR_RESPONSEMESSAGE_NOT_RECEIVED );}}$response->{CorrelationId} == $CorrelationId# FATAL erroror $self->_error( $ERROR_MISMATCH_CORRELATIONID );$topic_data = $response->{topics}->[0];$partition_data = $topic_data->{ $api_key == $APIKEY_OFFSET ? 'PartitionOffsets' : 'partitions' }->[0];if ( ( $ErrorCode = $partition_data->{ErrorCode} ) == $ERROR_NO_ERROR ) {return $response;} elsif ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {$self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );last REQUEST; # go to the next attempt} else {# FATAL error$self->_error( $ErrorCode, format_message( "topic = '%s', partition = %s", $topic_name, $partition ) );}}}# Expect to possible changes in the situation, such as restoration of connectionsay STDERR sprintf( '[%s] sleeping for %d ms before making request attempt #%d (%s)',scalar( localtime ),$self->{RETRY_BACKOFF},$self->{SEND_MAX_ATTEMPTS} - $attempts + 1,$ErrorCode == $ERROR_NO_ERROR ? 'refreshing metadata' : "ErrorCode ${ErrorCode}",) if $self->debug_level;sleep $self->{RETRY_BACKOFF} / 1000;$self->_update_metadata( $topic_name ) //最重要的邏輯在這里.可以看見上面失敗則跳出REQUEST塊,直接到這里執行更新動作.更新完之后再進行下一次嘗試.這個邏輯應對著topic 分區的leader動態切換的.現有leader死了,切換到其他的leader上來.客戶端能對此作出應對.# FATAL erroror $self->_error( $ErrorCode || $ERROR_CANNOT_GET_METADATA, format_message( "topic = '%s', partition = %s", $topic_name, $partition ) );}# FATAL errorif ( $ErrorCode ) {$self->_error( $ErrorCode, format_message( "topic = '%s'%s", $topic_data->{TopicName}, $partition_data ? ", partition = ".$partition_data->{Partition} : q{} ) );} else {$self->_error( $ERROR_UNKNOWN_TOPIC_OR_PARTITION, format_message( "topic = '%s', partition = %s", $topic_name, $partition ) );}return; }
上面主要分析核心邏輯實現.可以發現:
consumer在消費的時候并沒有手動提交過offset.也未設置groupId相關的配置,所以在消費的時候server其實并不是強制按group消費的,也不自動記錄對應offset.只是按提交的offset返回對應的消息和下一個offset值而已.所以在kafka按組消費的功能其實是有各個客戶端api實現的.在新版java的api中可以看見有autoCommitOffset的方法.在老版java api實現里也有autocommit的線程在替用戶提交groupId與offset的記錄.
producer和consumer的request里均需要指定topic分區.所以實際上在真正的api底層是沒有對topic分區做負載的.一些具有負載功能的其他語言的api均由客戶端內部實現.并非kafka server提供的功能.
總結
以上是生活随笔為你收集整理的Kafka 客户端实现逻辑分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 四年级英文歌曲大全(四年级英语歌曲大全)
- 下一篇: 使用Gitblit 搭建Windows