【1】flink-source读取数据
【README】
本文記錄了flink讀取不同數據源的編碼方式,數據源包括;
- 集合(元素列表);
- 文件
- kafka;
- 自定義數據源;
本文使用的flink為 1.14.4 版本;maven依賴如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency>【1】從集合讀取數據
【1.1】代碼
/*** @Description flink從集合讀取數據 * @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class SourceTest1_Collection {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從集合讀取數據DataStream<SensorReading> sensorStream = env.fromCollection(Arrays.asList(new SensorReading("sensor_1", 12341561L, 36.1), new SensorReading("sensor_2", 12341562L, 33.5), new SensorReading("sensor_3", 12341563L, 39.9), new SensorReading("sensor_4", 12341564L, 31.2)));// 打印輸出sensorStream.print("sensor");// 從元素列表讀取數據DataStream<Integer> intStream = env.fromElements(1, 2, 3, 7, 8, 2, 100, 34, 3);intStream.print("intStream");// 執行env.execute("sensorJob");} } /*** @Description 傳感器溫度讀數* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class SensorReading {private String id;private Long timestamp;private double temperature;public SensorReading() {}public SensorReading(String id, Long timestamp, double temperature) {this.id = id;this.timestamp = timestamp;this.temperature = temperature;}打印結果:
intStream:6> 8
intStream:5> 7
intStream:7> 2
sensor:8> SensorReading{id='sensor_2', timestamp=12341562, temperature=33.5}
intStream:1> 34
sensor:1> SensorReading{id='sensor_3', timestamp=12341563, temperature=39.9}
intStream:3> 2
intStream:4> 3
intStream:2> 1
intStream:2> 3
sensor:7> SensorReading{id='sensor_1', timestamp=12341561, temperature=36.1}
intStream:8> 100
sensor:2> SensorReading{id='sensor_4', timestamp=12341564, temperature=31.2}
【2】 從文件讀取數據
【2.1】代碼
/*** @Description flink從文件讀取數據* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class SourceTest2_File {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 設置全局并行度為1// 從文件讀取數據DataStream<String> fileStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 打印輸出fileStream.print("sensor");// 執行env.execute("sensorJob");} }sensor.txt 如下:
sensor_1,12341561,36.1 sensor_2,12341562,33.5 sensor_3,12341563,39.9 sensor_4,12341564,31.2打印結果:
sensor> sensor_1,12341561,36.1
sensor> sensor_2,12341562,33.5
sensor> sensor_3,12341563,39.9
sensor> sensor_4,12341564,31.2
【3】從kafka讀取數據
1)引入maven依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.4</version></dependency>2)flink作為消費者連接到kafka
/*** @Description flink從kafka讀取數據* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class SourceTest3_kafka {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 設置全局并行度為1// 創建flink連接kafkaKafkaSource kafkaSource = KafkaSource.<String>builder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics("hello0415").setGroupId("flink").build();DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");// 打印輸出kafkaStream.print("kafkaStream");// 執行env.execute("kafkaStreamJob");} } public enum KafkaConsumerProps {_INS;/* 1.創建kafka生產者的配置信息 */Properties props = new Properties();private KafkaConsumerProps() {/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");}public Properties getProps() {return props;} }3)打開 kafka生產者命令行:
kafka-console-producer.sh --broker-list centos201:9092,centos202:9092,centos203:9092 --topic hello0415補充: 關于kafka集群,可以參見我的文章 :
kafka集群搭建_PacosonSWJTU的博客-CSDN博客
?【4】自定義數據源
自定義數據源,可以用于自測 flinkjob 的場景中;
public class SourceTest4_UDF {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 設置全局并行度為1// 創建自定義數據源DataStream<SensorReading> udfStream = env.addSource(new SourceFunction<SensorReading>() {int i = 1;int mod = 1000;Random random = new Random();boolean runnable = true;@Overridepublic void run(SourceContext<SensorReading> sourceContext) throws Exception {while (runnable) {sourceContext.collect(new SensorReading(String.valueOf(i++ % mod + 1), System.currentTimeMillis(), 30 + random.nextGaussian()));if (i % 5 == 0) TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {runnable = false;}});// 打印輸出udfStream.print("udfStream");// 執行env.execute("udfStreamJob");} }打印結果:
udfStream:4> SensorReading{id='5', timestamp=1650030559865, temperature=31.015354380481117}
udfStream:1> SensorReading{id='2', timestamp=1650030559853, temperature=29.23797321841027}
udfStream:3> SensorReading{id='4', timestamp=1650030559865, temperature=31.148402161461384}
udfStream:2> SensorReading{id='3', timestamp=1650030559865, temperature=30.082462570224305}
總結
以上是生活随笔為你收集整理的【1】flink-source读取数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: es6 dsl与sql对比
- 下一篇: 华为手机如何删除app软件