生活随笔
收集整理的這篇文章主要介紹了
mac系统下使用flink消费docker运行的kafka
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
版本
flink 1.12.0
scala 2.11
java 1.8
kafka 2.0.2
首先使用maven創建一個新的工程
mvn archetype:generate -DarchetypeGroupId
=org.apache.flink -DarchetypeArtifactId
=flink-quickstart-java -DarchetypeVersion
=1.12.0 -DgroupId
=learn.flink -DartifactId
=flink-java -Dversion
=1.0 -Dpackage
=com.flink -DinteractiveMode
=false
創建完之后打開kafka的pom注釋
之后就是flink程序:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class streamkafka
{public static void main(String[] args
) throws Exception {StreamExecutionEnvironment env
= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());Properties properties
= new Properties();properties
.setProperty("bootstrap.servers", "kafka:9090");properties
.setProperty("auto.offset.reset", "earliest");properties
.setProperty("group.id", "g3");properties
.setProperty("enable.auto.commit", "true");FlinkKafkaConsumer<String> kafkaConsumer
= new FlinkKafkaConsumer<>("logsTopic", new SimpleStringSchema(), properties
);DataStreamSource<String> lines
= env
.addSource(kafkaConsumer
);lines
.print();env
.execute();}
}
下面開始準備docker啟動kafka。
安裝docker不多說了,下面就是使用docker compose生成 zookeeper和kafka的運行容器:
version: "3.3"
services:zookeeper:image: zookeeper
:3.5.5
restart: always
container_name: zookeeper
ports:- "2181:2181"expose:- "2181"environment:- ZOO_MY_ID=1
kafka:image: wurstmeister/kafka
:2.11
-2.0.1
restart: always
container_name: kafka
environment:- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT
://kafka
:9090- KAFKA_ZOOKEEPER_CONNECT=zookeeper
:2181- KAFKA_MESSAGE_MAX_BYTES=2000000
ports:- "9090:9090"depends_on:- zookeeper
docker-compose -f docker-compose-kafka.yml up -d
運行成功后啟動創建kafka。
注意 :使用docker需要再hosts配置宿主機的映射
使用docker創建kafka topic
docker run -it --rm --network
host wurstmeister/kafka:2.11-2.0.1
\bash /opt/kafka/bin/kafka-topics.sh
\--zookeeper kafka:2181
\--create --topic flink --partitions
1 --replication-factor
1
創建成功后,啟動kafka生產者生產數據:
docker run -it --rm --network
host wurstmeister/kafka:2.11-2.0.1
\bash /opt/kafka/bin/kafka-console-producer.sh
\--broker-list kafka:9090 --topic flink
回車后退出,在使用消費者測試數據是否可以消費:
docker run -it --rm --network
host wurstmeister/kafka:2.11-2.0.1
\bash /opt/kafka/bin/kafka-console-consumer.sh
\--bootstrap-server kafka:9090 --topic flink --from-beginning
可以看到消費者正常消費,后面就是啟動flink程序;
可以看到flink消費到了剛剛kafka生產者中生產的數據,這一波串聯成功。
總結
以上是生活随笔為你收集整理的mac系统下使用flink消费docker运行的kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。