Kafka C++客户端库librdkafka笔记
目錄
目錄 1
1.?前言 2
2.?縮略語 2
3.?配置和主題 3
3.1.?配置和主題結構 3
3.1.1.?Conf 3
3.1.2.?ConfImpl 3
3.1.3.?Topic 3
3.1.4.?TopicImpl 3
4.?線程 4
5.?消費者 5
5.1.?消費者結構 5
5.1.1.?Handle 5
5.1.2.?HandleImpl 5
5.1.3.?ConsumeCb 6
5.1.4.?EventCb 6
5.1.5.?Consumer 7
5.1.6.?KafkaConsumer 7
5.1.7.?KafkaConsumerImpl 7
5.1.8.?rd_kafka_message_t 7
5.1.9.?rd_kafka_msg_s 7
5.1.10.?rd_kafka_msgq_t 8
5.1.11.?rd_kafka_toppar_t 8
6.?生產者 10
6.1.?生產者結構 10
6.1.1.?DeliveryReportCb 11
6.1.2.?PartitionerCb 11
6.1.3.?Producer 11
6.1.4.?ProduceImpl 11
6.2.?生產者啟動過程1 11
6.3.?生產者啟動過程2 12
6.4.?生產者生產過程 14
7.?poll過程 15
1.?前言
librdkafka提供的異步的生產接口,異步的消費接口和同步的消息接口,沒有同步的生產接口。
2.?縮略語
縮略語 | 縮略語全稱 | 示例或說明 |
rd | Rapid?Development | rd.h |
rk | RdKafka | ? |
toppar | Topic?Partition | struct?rd_kafka_toppar_t { }; |
rep | Reply, | struct?rd_kafka_t?{ ??rd_kafka_q_t?*rk_rep }; |
msgq | Message?Queue | struct?rd_kafka_msgq_t?{ }; |
rkb | RdKafka?Broker | Kafka代理 |
rko | RdKafka?Operation | Kafka操作 |
rkm | RdKafka?Message | Kafka消息 |
payload | ? | 存在Kafka上的消息(或叫Log) |
3.?配置和主題
3.1.?配置和主題結構
?
3.1.1.?Conf
配置接口,配置分兩種:全局的和主題的。
3.1.2.?ConfImpl
配置的實現。
3.1.3.?Topic
主題接口。
3.1.4.?TopicImpl
主題的實現。
4.?線程
RdKafka編程涉及到三類線程:
1)?應用線程,業務代碼的實現
2)?Kafka?Broker線程rd_kafka_broker_thread_main,負責與Broker通訊,多個
3)?Kafka?Handler線程rd_kafka_thread_main,每創建一個consumer或producer即會創建一個Handler線程。
?
5.?消費者
5.1.?消費者結構
?
5.1.1.?Handle
定義了poll等接口,它的實現者為HandleImpl。
5.1.2.?HandleImpl
實現了消費者和生產者均使用的poll等,其中poll的作用為:
1)?為生產者回調消息發送結果;
2)?為生產者和消費者回調事件。
class?Handle?{ ??/** ???*?@brief?Polls?the?provided?kafka?handle?for?events. ???* ???*?Events?will?trigger?application?provided?callbacks?to?be?called. ???* ???*?The?\p?timeout_ms?argument?specifies?the?maximum?amount?of?time ???*?(in?milliseconds)?that?the?call?will?block?waiting?for?events. ???*?For?non-blocking?calls,?provide?0?as?\p?timeout_ms. ???*?To?wait?indefinately?for?events,?provide?-1. ???* ???*?Events: ???*???-?delivery?report?callbacks?(if?an?RdKafka::DeliveryCb?is?configured)?[producer] ???*???-?event?callbacks?(if?an?RdKafka::EventCb?is?configured)?[producer?&?consumer] ???* ???*?@remark??An?application?should?make?sure?to?call?poll()?at?regular ???*??????????intervals?to?serve?any?queued?callbacks?waiting?to?be?called. ???* ???*?@warning?This?method?MUST?NOT?be?used?with?the?RdKafka::KafkaConsumer, ???*??????????use?its?RdKafka::KafkaConsumer::consume()?instead. ???* ???*?@returns?the?number?of?events?served. ???*/ ??virtual?int?poll(int?timeout_ms)?=?0; }; |
5.1.3.?ConsumeCb
只針對消費者的Callback。
5.1.4.?RebalanceCb
只針對消費者的Callback。
5.1.5.?EventCb
消費者和生產者均可設置EventCb,如:_global_conf->set("event_cb",?&_event_cb,?errmsg);。
/** ?*?@brief?Event?callback?class ?* ?*?Events?are?a?generic?interface?for?propagating?errors,?statistics,?logs,?etc ?*?from?librdkafka?to?the?application. ?* ?*?@sa?RdKafka::Event ?*/ class?RD_EXPORT?EventCb?{ ?public: ??/** ???*?@brief?Event?callback ???* ???*?@sa?RdKafka::Event ???*/ ??virtual?void?event_cb?(Event?&event)?=?0; ? ??virtual?~EventCb()?{?} }; ? /** ?*?@brief?Event?object?class?as?passed?to?the?EventCb?callback. ?*/ class?RD_EXPORT?Event?{ ?public: ??/**?@brief?Event?type?*/ ??enum?Type?{ ????EVENT_ERROR,?????/**<?Event?is?an?error?condition?*/ ????EVENT_STATS,?????/**<?Event?is?a?statistics?JSON?document?*/ ????EVENT_LOG,???????/**<?Event?is?a?log?message?*/ ????EVENT_THROTTLE???/**<?Event?is?a?throttle?level?signaling?from?the?broker?*/ ??}; }; |
5.1.6.?Consumer
簡單消息者,一般不使用,而是使用KafkaConsumer。
5.1.7.?KafkaConsumer
消費者和生產者均采用多重繼承方式,其中KafkaConsumer為消費者接口,KafkaConsumerImpl為消費者實現。
5.1.8.?KafkaConsumerImpl
KafkaConsumerImpl為消費者實現。
5.1.9.?rd_kafka_message_t
消息結構。
5.1.10.?rd_kafka_msg_s
消息結構,但消息數據實際存儲在rd_kafka_message_t,結構大致如下:
struct?rd_kafka_msg_s { ??rd_kafka_message_t?rkm_rkmessage; ??struct ??{ ????rd_kafka_msg_s*?tqe_next; ????rd_kafka_msg_s**?tqe_prev; ????int64_t?rkm_timestamp; ????rd_kafka_timestamp_type_t?rkm_tstype; ??}rkm_link; }; |
5.1.11.?rd_kafka_msgq_t
存儲消息的消息隊列,生產者生產的消息并不直接socket發送到brokers,而是放入了這個隊列,結構大致如下:
struct?rd_kafka_msgq_t { ??struct ??{ ????rd_kafka_msg_s*?tqh_first;?//?隊首 ????rd_kafka_msg_s*?tqh_last;??//?隊尾 ??}; ?? ??//?消息個數 ??rd_atomic32_t?rkmq_msg_cnt; ??//?所有消息加起來的字節數 ??rd_atomic64_t?rkmq_msg_bytes; }; |
5.1.12.?rd_kafka_toppar_t
Topic-Partition隊列,很復雜的一個結構,部分內容如下:
//?Topic?+?Partition?combination typedef?struct?rd_kafka_toppar_s { ??struct ??{ ????rd_kafka_toppar_s*?tqe_next; ????rd_kafka_toppar_s**?tqe_prev; ??}rktp_rklink; ? ??struct ??{ ????rd_kafka_toppar_s*?tqe_next; ????rd_kafka_toppar_s**?tqe_prev; ??}rktp_rkblink; ?? ??struct ??{ ????rd_kafka_toppar_s*?cqe_next; ????rd_kafka_toppar_s*?cqe_prev; ??}rktp_fetchlink; ?? ??struct ??{ ????rd_kafka_toppar_s*?tqe_next; ????rd_kafka_toppar_s**?tqe_prev; ??}rktp_rktlink; ?? ??struct ??{ ????rd_kafka_toppar_s*?tqe_next; ????rd_kafka_toppar_s**?tqe_prev; ??}rktp_cgrplink; ?? ??rd_kafka_itopic_t*?rktp_rkt; ??int32_t?rktp_partition; ??int32_t?rktp_leader_id; ??rd_kafka_broker_t*?rktp_leader; ??rd_kafka_broker_t*?rktp_next_leader; ??rd_refcnt_t?rktp_refcnt; ??rd_kafka_msgq_t?rktp_msgq;?//?application->rdkafka?queue }rd_kafka_toppar_t; |
6.?生產者
6.1.?生產者結構
?
6.1.1.?DeliveryReportCb
消息已經成功遞送到Broker時回調,只針對生產者有效。
6.1.2.?PartitionerCb
計算分區號回調函數,只針對生產者有效。
6.1.3.?Producer
Producer為生產者接口,它的實現者為ProducerImpl。
6.1.4.?ProduceImpl
ProducerImpl為生產者的實現。
6.2.?生產者啟動過程1
啟動時會創建兩組線程:一組Broker線程(rd_kafka_broker_thread_main,多個),實為與Broker間的網絡IO線程;一組Handler線程(rd_kafka_thread_main,單個),每調用一次RdKafka::Producer::create或rd_kafka_new即創建一Handler線程。
Handler線程調用棧:
(gdb)?t?17 [Switching?to?thread?17?(Thread?0x7ff7059d3700?(LWP?16765))] #0??0x00007ff7091e6cf2?in?pthread_cond_timedwait@@GLIBC_2.3.2?()?from?/lib64/libpthread.so.0 (gdb)?bt #0??0x00007ff7091e6cf2?in?pthread_cond_timedwait@@GLIBC_2.3.2?()?from?/lib64/libpthread.so.0 #1??0x00000000005b4d2f?in?cnd_timedwait_ms?(cnd=0x1517748,?mtx=0x1517720,?timeout_ms=898)?at?tinycthread.c:501 #2??0x0000000000580e16?in?rd_kafka_q_serve?(rkq=0x1517720,?timeout_ms=898,?max_cnt=0,?cb_type=RD_KAFKA_Q_CB_CALLBACK,?callback=0x0,?opaque=0x0)?at?rdkafka_queue.c:440 #3??0x000000000054ee9b?in?rd_kafka_thread_main?(arg=0x1516df0)?at?rdkafka.c:1227 #4??0x00000000005b4e0f?in?_thrd_wrapper_function?(aArg=0x15179d0)?at?tinycthread.c:624 #5??0x00007ff7091e2e25?in?start_thread?()?from?/lib64/libpthread.so.0 #6??0x00007ff7082d135d?in?clone?()?from?/lib64/libc.so.6 |
6.3.?生產者啟動過程2
創建網絡IO線程,消費者啟動過程類似,只是一個調用rd_kafka_broker_producer_serve(rkb),另一個調用rd_kafka_broker_consumer_serve(rkb)。
IO線程負責消息的收和發,發送底層調用的是sendmsg,收調用的是recvmsg(但MSVC平臺調用send和recv)。
?
6.4.?生產者生產過程
?
生產者生產的消息并不直接socket發送到brokers,而是放入隊列rd_kafka_msgq_t中。Broker線程(rd_kafka_broker_thread_main)消費這個隊列。
Broker線程同時監控與Broker間的網絡連接,又要監控隊列中是否有數據,如何實現的?這個隊列和管道綁定在一起的,綁定的是管道寫端(rktp->rktp_msgq_wakeup_fd?=?rkb->rkb_toppar_wakeup_fd;?rkb->rkb_toppar_wakeup_fd=rkb->rkb_wakeup_fd[1])。
這樣Broker線程即可同時監聽網絡數據和管道數據。
//?int?rd_kafka_msg_partitioner(rd_kafka_itopic_t?*rkt,?rd_kafka_msg_t?*rkm,int?do_lock) (gdb)?p?*rkm $7?=?{rkm_rkmessage?=?{err?=?RD_KAFKA_RESP_ERR_NO_ERROR,?rkt?=?0x1590c10,?partition?=?1,?payload?=?0x7f48c4001260,?len?=?203,?key?=?0x7f48c400132b,?key_len?=?14,?offset?=?0,? ????_private?=?0x0},?rkm_link?=?{tqe_next?=?0x5b5d47554245445b,?tqe_prev?=?0x6361667265746e69},?rkm_flags?=?196610,?rkm_timestamp?=?1524829399009,? ??rkm_tstype?=?RD_KAFKA_TIMESTAMP_CREATE_TIME,?rkm_u?=?{producer?=?{ts_timeout?=?16074575505526,?ts_enq?=?16074275505526}}} (gdb)?p?rkm->rkm_rkmessage $8?=?{err?=?RD_KAFKA_RESP_ERR_NO_ERROR,?rkt?=?0x1590c10,?partition?=?1,?payload?=?0x7f48c4001260,?len?=?203,?key?=?0x7f48c400132b,?key_len?=?14,?offset?=?0,?_private?=?0x0} (gdb)?p?rkm->rkm_rkmessage->payload $9?=?(void?*)?0x7f48c4001260 (gdb)?p?(char*)rkm->rkm_rkmessage->payload $10?=?0x7f48c4001260?"{\"p\":\"f\",\"o\":1,\"d\":\"m\",\"d\":\"m\",\"i\":\"f2\",\"ip\":\"127.0.0.1\",\"pt\":2018,\"sc\":0,\"fc\":1,\"tc\":0,\"acc\":395,\"mcc\":395,\"cd\":\"test\",\"cmd\":\"tester\",\"cf\":\"main\",\"cp\":\"1.49.16.9"... |
7.?poll過程
poll的作用是觸發回調,生產者即使不調用poll,消息也會發送出去,但是如果不通過poll觸發回調,則不能確定消息發送狀態(成功或失敗等)。
消費隊列rd_kafka_t->rk_rep,rk_rep為響應隊列,類型為rd_kafka_q_t或rd_kafka_q_s:
?
?
?
轉載于:https://www.cnblogs.com/aquester/p/9891483.html
總結
以上是生活随笔為你收集整理的Kafka C++客户端库librdkafka笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据库基本使用
- 下一篇: BZOJ1503[NOI2004]郁闷的