Apache Flink 零基础入门(二十)Flink kafka connector
生活随笔
收集整理的這篇文章主要介紹了
Apache Flink 零基础入门(二十)Flink kafka connector
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
內置source和sink
內置source包括從文件讀取,從文件夾讀取,從socket中讀取、從集合或者迭代器中讀取。內置的sink包括寫文件、控制臺輸出、socket
內置connectors
- Apache Kafka?(source/sink)
- Apache Cassandra?(sink)
- Amazon Kinesis Streams?(source/sink)
- Elasticsearch?(sink)
- Hadoop FileSystem?(sink)
- RabbitMQ?(source/sink)
- Apache NiFi?(source/sink)
- Twitter Streaming API?(source)
?HDFS Connector
這個connector提供了一個sink,可以寫分區到任何一個文件系統(只要支持hadoop filesystem就可以)。
?Kafka Connector
添加依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency>下載安裝zookeeper
下載安裝kafka
- 同樣修改kafka_2.11-2.0.1/config下面的server.properties文件,修改:
? ? ? ? ? ?log.dirs=/home/vincent/tmp/kafka-logs
? ? ? ? ? ?zookeeper.connect=localhost:2181
- 運行Kafka:
- 輸入jps可以看到Kafka進程說明kafka成功啟動了。
- 創建一個topic
- 創建成功,查看所有topic
- 啟動生產者
- 啟動消費者
Flink連接Kafka
source 從Kafka中讀取數據
Scala:
object KafkaConnectorConsumerApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval properties = new Properties()properties.setProperty("bootstrap.servers", "192.168.227.128:9092")properties.setProperty("group.id", "test")env.addSource(new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), properties)).print()env.execute("KafkaConnectorConsumerApp")}Java:
public class JavaKafkaConsumerApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.227.128:9092");properties.setProperty("group.id", "test");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("mytest", new SimpleStringSchema(), properties));stream.print();env.execute("JavaKafkaConsumerApp");} }sink 將數據輸出到Kafka中
Scala:
object KafkaConnectorProducerApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 從socket接受數據,通過Flink,將數據Sink到kafkaval data=env.socketTextStream("192.168.227.128", 9999)val properties = new Properties()properties.setProperty("bootstrap.servers", "192.168.227.128:9092")properties.setProperty("group.id", "test")val kafkaSink = new FlinkKafkaProducer[String]("mytest", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), properties)data.addSink(kafkaSink)env.execute("KafkaConnectorProducerApp")} }Java:
public class JavaKafkaProducerApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.socketTextStream("192.168.227.128", 9999);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.227.128:9092");properties.setProperty("group.id", "test");data.addSink(new FlinkKafkaProducer<String>("192.168.227.128:9092", "mytest", new SimpleStringSchema()));env.execute("JavaKafkaProducerApp");} }默認的flink kafka消費策略是setStartFromGroupOffsets(default behaviour),會自動從上一次未消費的數據開始
總結
以上是生活随笔為你收集整理的Apache Flink 零基础入门(二十)Flink kafka connector的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门(十
- 下一篇: Apache Flink 零基础入门(二