flink 写kafka_flink消费kafka的offset与checkpoint
生產環境有個作業,邏輯很簡單,讀取kafka的數據,然后使用hive catalog,實時寫入hbase,hive,redis。使用的flink版本為1.11.1。
為了防止寫入hive的文件數量過多,我設置了checkpoint為30分鐘。
env.enableCheckpointing(1000 * 60 * 30); // 1000 * 60 * 30 => 30 minutes達到的效果就是每30分鐘生成一個文件,如下:
hive> dfs -ls /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/ ;Found 10 items-rw-r--r-- 3 hdfs hive 0 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/_SUCCESS-rw-r--r-- 3 hdfs hive 248895 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10911-rw-r--r-- 3 hdfs hive 306900 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10912-rw-r--r-- 3 hdfs hive 208227 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-0-10913-rw-r--r-- 3 hdfs hive 263586 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10911-rw-r--r-- 3 hdfs hive 307723 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10912-rw-r--r-- 3 hdfs hive 196777 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-1-10913-rw-r--r-- 3 hdfs hive 266984 2020-10-18 00:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10911-rw-r--r-- 3 hdfs hive 338992 2020-10-18 00:50 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10912-rw-r--r-- 3 hdfs hive 216655 2020-10-18 01:20 /opt/user/hive/warehouse/dw.db/ods_analog_zgyp/dt_day=2020-10-18/dt_hour=00/part-5b7b6d44-993a-4af7-b7ee-1a8ab64d3453-2-10913hive>但是,同時也觀察到歸屬于這個作業的kafka消費組積壓數量,每分鐘消費數量,明顯具有周期性消費峰值。
比如,對于每30分鐘時間間隔度的一個觀察,前面25分鐘的“每分鐘消費數量”都是為0,然后,后面5分鐘的“每分鐘消費數量”為300k。同理,“消費組積壓數量”也出現同樣情況,積壓數量一直遞增,但是到了30分鐘的間隔,就下降到數值0。如圖。
消費組每分鐘消費數量
消費組積壓數量
但其實,通過對hbase,hive,redis的觀察,數據是實時寫入的,并不存在前面25分鐘沒有消費數據的情況。
查閱資料得知,flink會自己維護一份kafka的offset,然后checkpoint時間點到了,再把offset更新回kafka。
為了驗證這個觀點,“flink在checkpoint的時候,才把消費kafka的offset更新回kafka”,同時,觀察,savepoint機制是否會重復消費kafka,我嘗試寫一個程序,邏輯很簡單,就是從topic "test"讀取數據,然后寫入topic "test2"。特別說明,這個作業的checkpoint是1分鐘。
package com.econ.powercloud.jobsTest;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import javax.annotation.Nullable;import java.util.Properties;public class TestKafkaOffsetCheckpointJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000 * 60); ParameterTool parameterTool = ParameterTool.fromArgs(args); String bootstrapServers = parameterTool.get("bootstrap.servers") == null ? "localhost:9092" : parameterTool.get("bootstrap.servers"); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", bootstrapServers); properties.setProperty("group.id", "prod-econ-flink-TestKafkaOffsetCheckpointJob-local"); properties.setProperty("transaction.timeout.ms", String.valueOf(1000 * 60 * 5)); String topic = "test"; FlinkKafkaConsumer stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); DataStreamSource stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer); String producerTopic = "test2"; FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema() { @Override public ProducerRecord serialize(String element, @Nullable Long timestamp) { return new ProducerRecord<>(producerTopic, element.getBytes()); } }, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); stringDataStreamSource.addSink(kafkaProducer); env.execute("TestKafkaOffsetCheckpointJob"); }}提交作業:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6[econ@dev-hadoop-node-c ~]$使用"kafka-console-producer.sh"往topic "test"生成消息"a1":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:>a1>使用"kafka-console-consumer.sh"消費topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1證明作業邏輯本身沒有問題,實現' 從topic "test"讀取數據,然后寫入topic "test2" '。
使用"kafka-consumer-groups.sh"觀察消費組"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的積壓數量,重點觀察指標"LAG",可以看到LAG為1 :
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 3 0 - - -test 0 3 3 0 - - -test 2 5 6 1 - - -2020年10月18日 星期日 20時09分45秒 CSTRdeMacBook-Pro:kafka r$證明flink消費了kafka數據后,不會更新offset到kafka。
停止作業:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd[econ@dev-hadoop-node-c ~]$再次啟動作業,但是,不使用上面生成的savepoint:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83[econ@dev-hadoop-node-c ~]$觀察topic "test2",發現,同樣的數據"a1"被生產進入:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1證明:flink在沒有使用savepoint的時候,消費kafka的offset還是從kafka自身獲取。
再仔細觀察topic "test"的“消費組積壓數量”,注意在"20時10分05秒"還觀察到積壓數值1,但是在"20時10分08秒"就發現積壓數值都是0.
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 3 0 - - -test 0 3 3 0 - - -test 2 5 6 1 - - -2020年10月18日 星期日 20時10分05秒 CSTRdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 3 0 - - -test 0 3 3 0 - - -test 2 6 6 0 - - -2020年10月18日 星期日 20時10分08秒 CSTRdeMacBook-Pro:kafka r$這是因為,在"20:10:06"完成了一次checkpoint,把offset更新回kafka。
Flink Checkpoint History
下面接著測試flink使用savepoint的情況下,是否會重復消費kafka數據。
使用"kafka-console-producer.sh"往topic "test"生成消息"a2":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092>a1>a2>使用"kafka-console-consumer.sh"消費topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2停止作業:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60[econ@dev-hadoop-node-c ~]$觀察topic "test"的“消費組積壓數量”,發現LAG還是1:
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 3 4 1 - - -test 0 3 3 0 - - -test 2 6 6 0 - - -2020年10月18日 星期日 20時28分39秒 CSTRdeMacBook-Pro:kafka r$flink使用savepoint啟動作業,注意參數"-s":
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d -s 'hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60' ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID d6cb6e1a6f9c0816ac4b61a1df38ddeb[econ@dev-hadoop-node-c ~]$觀察"kafka-console-consumer.sh"消費topic "test2"的情況,沒有新的消息被打印:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2再觀察“消費組積壓數量”,發現LAG值已經全部是0。
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtest 1 4 4 0 - - -test 0 3 3 0 - - -test 2 6 6 0 - - -2020年10月18日 星期日 20時31分43秒 CSTRdeMacBook-Pro:kafka r$證明:flink使用savepoint啟動作業,不會重復消費kafka數據,也會正確更新kafka的offset。
重申,以上試驗證明:
總結
以上是生活随笔為你收集整理的flink 写kafka_flink消费kafka的offset与checkpoint的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 函数传参string_C/C++的三种函
- 下一篇: 王小云:连破两套美国顶级密码,获得711