Flink自定义SQL连接器
1. 為什么要自定義連接器
通常我們會有這樣的需求,在使用Flink SQL將指標進行聚合計算完成之后,想要寫入到我們想要寫入的中間件時,例如opentsdb時序數據庫,可能會發現Flink官方并沒有給我們提供opentsdb的連接器,這個時候我們就需要自己去定義連接器
2. 自定義連接器的步驟
3. 各步驟解釋
這里我們以sink的角度來解釋一下各個步驟,source的角度是類似的
3.1 StreamTableSinkFactory
我們需要創建自己的Factory去實現StreamTableSinkFactory,主要關注它的幾個方法:createStreamTableSink(Map),requiredContext(),supportedProperties():定義connector支持的配置
createStreamTableSink(Map):創建StreamTableSink
requiredContext():唯一標識這個connector的類型,即connector.type
3.2 AppendStreamTableSink
這是一個追加流,當然還有upsertStreamTableSink和RetractStreamTableSink,根據自己的需求去使用,它們之間的區別略過
consumeDataStream():這是我們重點關注的方法,這個方法用于消費數據流中的數據然后通過addSink調用RichSinkFunction,將數據進行消費
3.3 RichSinkFunction
我們自己實現的Sink方法,主要有三個方法
invoke(Row,Context):關鍵代碼,我們在這里獲取數據然后進行操作
open:一般進行初始化操作,例如初始化一些客戶端如httpClient,kafkaClient
close:結束時調用,一般進行關閉操作例如客戶端的關閉
3.4 ConnectorDescriptorValidator
4. 實戰代碼
4.1 生成數據
首先我們將數據推送到我們的Kafka中:
創建JavaBean:
public class KafkaTestBean {private Double bandWidth;private Long app_time;private Double packet;private String networkLineId;public KafkaTestBean(Double bandWidth, Long app_time, Double packet, String networkLineId) {this.bandWidth = bandWidth;this.app_time = app_time;this.packet = packet;this.networkLineId = networkLineId;}public Double getPacket() {return packet;}public void setPacket(Double packet) {this.packet = packet;}public KafkaTestBean() {}public Double getBandWidth() {return bandWidth;}public void setBandWidth(Double bandWidth) {this.bandWidth = bandWidth;}public Long getApp_time() {return app_time;}public void setApp_time(Long app_time) {this.app_time = app_time;}public String getNetworkLineId() {return networkLineId;}public void setNetworkLineId(String networkLineId) {this.networkLineId = networkLineId;} }Kafka消息生成器:
public class KafkaMessageGenerator {public static void main(String[] args) throws Exception{//配置信息Properties props = new Properties();//kafka服務器地址props.put("bootstrap.servers", "192.168.245.11:9092");//設置數據key和value的序列化處理類props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//創建生產者實例KafkaProducer<String,String> producer = new KafkaProducer<>(props);while (true){KafkaTestBean bean = new KafkaTestBean();bean.setNetworkLineId(UUID.randomUUID().toString());bean.setBandWidth(generateBandWidth());bean.setApp_time(System.currentTimeMillis() / 1000);bean.setPacket(generateBandWidth());ProducerRecord<String,String> record = new ProducerRecord("firsttopic", JSONObject.toJSONString(bean));producer.send(record);Thread.sleep(1000);}}private static Double generateBandWidth() {String s1 = String.valueOf((int) ((Math.random()) * 10));String s2 = String.valueOf((int) ((Math.random()) * 10));return Double.parseDouble(s1.concat(".").concat(s2));} }4.2 實現功能
實現這么一個功能:從kafka接受數據然后進行聚合計算,寫入到opentsdb中
4.2.1 創建TableFactory
package com.cxc.flink.extend;import com.alibaba.fastjson.JSONObject; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.FormatDescriptorValidator; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.factories.StreamTableSourceFactory; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;import static com.cxc.flink.extend.CustomizedConnectorDescriptorValidator.*; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;/*** create by chenxichao*/ public class CustomizedTableSourceSinkFactory implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {private CustomizedConnectorDescriptorValidator customizedConnectorDescriptorValidator;@Overridepublic StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {customizedConnectorDescriptorValidator = new CustomizedConnectorDescriptorValidator();final DescriptorProperties descriptorProperties = new DescriptorProperties(true);descriptorProperties.putProperties(properties);//參數校驗customizedConnectorDescriptorValidator.validate(descriptorProperties);final TableSchema schema = TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(Schema.SCHEMA));String job = descriptorProperties.getString(CONNECTOR_JOB);String metrics = descriptorProperties.getString(CONNECTOR_METRICS);String address = descriptorProperties.getString(CONNECTOR_ADDRESS);String format = descriptorProperties.getString(FORMAT_TYPE);return new CustomizedTableSink(job, metrics, address, schema,format);}@Overridepublic StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {return null;}@Overridepublic Map<String, String> requiredContext() {/*** 這里connector類型,通過這個配置flink有且只能discover一種connector*/Map<String,String> context = new HashMap<>();context.put(CONNECTOR_TYPE,CONNECTOR_TYPE_VALUE_CUSTOMIZE);return context;}/*** 這里是自定義connector支持的配置* @return*/@Overridepublic List<String> supportedProperties() {List<String> supportProperties = new ArrayList<>();supportProperties.add(CONNECTOR_JOB);supportProperties.add(CONNECTOR_METRICS);supportProperties.add(CONNECTOR_ADDRESS);//schemasupportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME);supportProperties.add(FormatDescriptorValidator.FORMAT_TYPE);return supportProperties;}}4.2.2 創建StreamTableSink
package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/ public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.address)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();} }4.2.3 創建RickSinkFunction
package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row;/*** create by chenxichao*/ public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;public CustomizedSinkFunction(String address){this.address = address;}@Overridepublic void invoke(Row value, Context context) {//打印即可System.out.println("send to " + address + "---" + value);} }4.2.4 創建ConnectorDescriptorValidator
package com.cxc.flink.extend;import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; import org.apache.flink.table.descriptors.DescriptorProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** create by chenxichao*/ public class CustomizedConnectorDescriptorValidator extends ConnectorDescriptorValidator {public static final String CONNECTOR_JOB = "connector.job";public static final String CONNECTOR_METRICS = "connector.metrics";public static final String CONNECTOR_ADDRESS = "connector.address";public static final String CONNECTOR_TYPE_VALUE_CUSTOMIZE = "customize";public static final String FORMAT_TYPE = "format.type";private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic void validate(DescriptorProperties properties) {/*** 這里對連接屬性進行校驗*/logger.info("開始校驗連接器參數");super.validate(properties);logger.info("連接器參數校驗完畢");} }4.2.5 創建META-INF/services
TableFactory是利用Java的SPI去發現工廠的,可以在TableServiceFactory的discoverFactories()方法去查看源碼
在目錄下創建META-INF/services/org.apache.flink.table.factories.TableFactory
內容為:
com.cxc.flink.extend.CustomizedTableSourceSinkFactory如果不創建該文件則會導致ServiceLoader找不到該工廠,使用就會報錯
4.2.6 SQL代碼
寫一個Flink SQL測試程序,計算之后發送到我們自定義的sink中
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment;/*** create by chenxichao*/ public class SQLExtendTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//輸入表的sql語句StringBuilder INPUT_SQL = new StringBuilder();INPUT_SQL.append("CREATE TABLE bandWidthInputTable (").append("networkLineId VARCHAR,bandWidth DOUBLE,app_time BIGINT,packet DOUBLE,").append("ts as TO_TIMESTAMP(FROM_UNIXTIME(app_time,'yyyy-MM-dd HH:mm:ss')),").append("WATERMARK FOR ts AS ts - INTERVAL '5' second) ").append("WITH(").append("'connector.type' = 'kafka',").append("'connector.version' = 'universal',").append("'connector.topic' = 'firsttopic',").append("'connector.properties.group.id' = 'start_log_group',").append("'connector.properties.zookeeper.connect' = '192.168.245.11:2181',").append("'connector.properties.bootstrap.servers' = '192.168.245.11:9092',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(INPUT_SQL.toString());//輸出表的sql語句StringBuilder OUT_TABLE_SQL = new StringBuilder();OUT_TABLE_SQL.append("CREATE TABLE bandWidthOutputTable (").append("metricset VARCHAR,`value` DOUBLE,`timestamp` BIGINT,networkLineId VARCHAR)").append("WITH(").append("'connector.type' = 'customize',").append("'connector.address' = '192.168.245.138:8081',").append("'connector.job' = 'testextendjob',").append("'connector.metrics' = 'testmetric',").append("'format.type' = 'json'").append(")");tableEnv.sqlUpdate(OUT_TABLE_SQL.toString());String sql = "INSERT INTO bandWidthOutputTable SELECT 'nmct_line_metric_bandwidth' as metricset,sum(bandWidth),avg(app_time),networkLineId FROM bandWidthInputTable GROUP BY tumble(ts,interval '5' second),networkLineId";tableEnv.sqlUpdate(sql);env.execute("window sql job");} }5. 功能擴展
上述代碼我們可以實現一個小小的功能就是打印在控制臺,但是我們發現我們的數據并不是一個標準的JSON格式,在很多場景中我們都需要我們的數據是JSON,那么如何實現呢?
這里通過翻閱Flink kafka連接器,發現一個SerializationSchema接口,這樣就很簡單了,找到實現類JsonRowSerializationSchema 它內部利用jackson進行json序列化,直接使用即可
5.1 修改RichSinkFunction
在invoke方法中處理數據然后進行序列化操作
package com.cxc.flink.extend;import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.json.JsonRowSerializationSchema; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row;/*** create by chenxichao*/ public class CustomizedSinkFunction extends RichSinkFunction<Row> {private String address;private SerializationSchema<Row> serializationSchema;public CustomizedSinkFunction(TypeInformation<Row> rowType,String address,String formatType){this.address = address;if(formatType.equals("json")){this.serializationSchema = new JsonRowSerializationSchema.Builder(rowType).build();}else{throw new RuntimeException("current custom format only support json serializer");}}@Overridepublic void invoke(Row value, Context context) {//打印即可byte[] serialize = this.serializationSchema.serialize(value);String jsonValue = new String(serialize);System.out.println("send to " + address + "---" + jsonValue);}}5.2 修改StreamTableSink
創建SinkFunction的時候將數據schema傳入,為了生成jsonNode
package com.cxc.flink.extend;import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.utils.TableConnectorUtils; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions;import java.util.Arrays;/*** create by chenxichao*/ public class CustomizedTableSink implements AppendStreamTableSink<Row> {private final String job;private final String metrics;private final String address;private final TableSchema schema;private final String formatType;public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {this.job = Preconditions.checkNotNull(job, "job must not be null.");this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");this.address = Preconditions.checkNotNull(address, "address must not be null.");this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");}@Overridepublic void emitDataStream(DataStream<Row> dataStream) {}@Overridepublic DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {return dataStream.addSink(new CustomizedSinkFunction(this.schema.toRowType(),this.address,formatType)).setParallelism(dataStream.getParallelism()).name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));}@Overridepublic TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {throw new ValidationException("Reconfiguration with different fields is not allowed. " +"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));}return this;}@Overridepublic TypeInformation<Row> getOutputType() {return schema.toRowType();}@Overridepublic String[] getFieldNames() {return schema.getFieldNames();}@Overridepublic TypeInformation<?>[] getFieldTypes() {return schema.getFieldTypes();} }總結
以上是生活随笔為你收集整理的Flink自定义SQL连接器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C# 实现海康摄像头在任意浏览器中预览
- 下一篇: Diffusion