javascript
Flink SQL JSON Format 源码解析
用 Flink SQL 解析 JSON 格式的數據是非常簡單的,只需要在 DDL 語句中設置 Format 為 json 即可,像下面這樣:
那么你有沒有想過它的底層是怎么實現的呢? 今天這篇文章就帶你深入淺出,了解其實現細節.
當你輸入一條 SQL 的時候在 Flink 里面會經過解析,驗證,優化,轉換等幾個重要的步驟,因為前面的幾個過程比較繁瑣,這里暫時不展開說明,我們直接來到比較關鍵的源碼處,在把 sqlNode 轉換成 relNode 的過程中,會來到 CatalogSourceTable#createDynamicTableSource 該類的作用是把 Calcite 的 RelOptTable 翻譯成 Flink 的 TableSourceTable 對象.
createDynamicTableSource ?源碼
private?DynamicTableSource?createDynamicTableSource(FlinkContext?context,?ResolvedCatalogTable?catalogTable)?{final?ReadableConfig?config?=?context.getTableConfig().getConfiguration();return?FactoryUtil.createTableSource(schemaTable.getCatalog(),schemaTable.getTableIdentifier(),catalogTable,config,Thread.currentThread().getContextClassLoader(),schemaTable.isTemporary()); }其實這個就是要創建 Kafka Source 的流表,然后會調用 FactoryUtil#createTableSource 這個方法
createTableSource 源碼
public?static?DynamicTableSource?createTableSource(@Nullable?Catalog?catalog,ObjectIdentifier?objectIdentifier,ResolvedCatalogTable?catalogTable,ReadableConfig?configuration,ClassLoader?classLoader,boolean?isTemporary)?{final?DefaultDynamicTableContext?context?=new?DefaultDynamicTableContext(objectIdentifier,?catalogTable,?configuration,?classLoader,?isTemporary);try?{//?獲取對應的?factory?這里其實就是?KafkaDynamicTableFactoryfinal?DynamicTableSourceFactory?factory?=getDynamicTableFactory(DynamicTableSourceFactory.class,?catalog,?context);//?創建動態表return?factory.createDynamicTableSource(context);}?catch?(Throwable?t)?{throw?new?ValidationException(String.format("Unable?to?create?a?source?for?reading?table?'%s'.\n\n"+?"Table?options?are:\n\n"+?"%s",objectIdentifier.asSummaryString(),catalogTable.getOptions().entrySet().stream().map(e?->?stringifyOption(e.getKey(),?e.getValue())).sorted().collect(Collectors.joining("\n"))),t);} }在這個方法里面,有兩個重要的過程,首先是獲取對應的 factory 對象,然后創建 DynamicTableSource 實例.在 getDynamicTableFactory 中實際調用的是 discoverFactory 方法,顧名思義就是發現工廠.
discoverFactory 源碼
public?static?<T?extends?Factory>?T?discoverFactory(ClassLoader?classLoader,?Class<T>?factoryClass,?String?factoryIdentifier)?{final?List<Factory>?factories?=?discoverFactories(classLoader);final?List<Factory>?foundFactories?=factories.stream().filter(f?->?factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());if?(foundFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factories?that?implement?'%s'?in?the?classpath.",factoryClass.getName()));}final?List<Factory>?matchingFactories?=foundFactories.stream().filter(f?->?f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());if?(matchingFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factory?for?identifier?'%s'?that?implements?'%s'?in?the?classpath.\n\n"+?"Available?factory?identifiers?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),foundFactories.stream().map(Factory::factoryIdentifier).distinct().sorted().collect(Collectors.joining("\n"))));}if?(matchingFactories.size()?>?1)?{throw?new?ValidationException(String.format("Multiple?factories?for?identifier?'%s'?that?implement?'%s'?found?in?the?classpath.\n\n"+?"Ambiguous?factory?classes?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),matchingFactories.stream().map(f?->?f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));}return?(T)?matchingFactories.get(0); }這個代碼相對簡單,就不加注釋了,邏輯也非常的清晰,就是獲取對應的 factory ,先是通過 SPI 機制加載所有的 factory 然后根據 factoryIdentifier 過濾出滿足條件的,這里其實就是 kafka connector 了.最后還有一些異常的判斷.
discoverFactories 源碼
private?static?List<Factory>?discoverFactories(ClassLoader?classLoader)?{try?{final?List<Factory>?result?=?new?LinkedList<>();ServiceLoader.load(Factory.class,?classLoader).iterator().forEachRemaining(result::add);return?result;}?catch?(ServiceConfigurationError?e)?{LOG.error("Could?not?load?service?provider?for?factories.",?e);throw?new?TableException("Could?not?load?service?provider?for?factories.",?e);} }這個代碼大家應該比較熟悉了,前面也有文章介紹過了.加載所有的 Factory 返回一個 Factory 的集合.
下面才是今天的重點.
createDynamicTableSource 源碼
public?DynamicTableSource?createDynamicTableSource(Context?context)?{TableFactoryHelper?helper?=?FactoryUtil.createTableFactoryHelper(this,?context);ReadableConfig?tableOptions?=?helper.getOptions();Optional<DecodingFormat<DeserializationSchema<RowData>>>?keyDecodingFormat?=?getKeyDecodingFormat(helper);//?format?的邏輯DecodingFormat<DeserializationSchema<RowData>>?valueDecodingFormat?=?getValueDecodingFormat(helper);helper.validateExcept(new?String[]{"properties."});KafkaOptions.validateTableSourceOptions(tableOptions);validatePKConstraints(context.getObjectIdentifier(),?context.getCatalogTable(),?valueDecodingFormat);StartupOptions?startupOptions?=?KafkaOptions.getStartupOptions(tableOptions);Properties?properties?=?KafkaOptions.getKafkaProperties(context.getCatalogTable().getOptions());properties.setProperty("flink.partition-discovery.interval-millis",?String.valueOf(tableOptions.getOptional(KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY).map(Duration::toMillis).orElse(-9223372036854775808L)));DataType?physicalDataType?=?context.getCatalogTable().getSchema().toPhysicalRowDataType();int[]?keyProjection?=?KafkaOptions.createKeyFormatProjection(tableOptions,?physicalDataType);int[]?valueProjection?=?KafkaOptions.createValueFormatProjection(tableOptions,?physicalDataType);String?keyPrefix?=?(String)tableOptions.getOptional(KafkaOptions.KEY_FIELDS_PREFIX).orElse((Object)null);return?this.createKafkaTableSource(physicalDataType,?(DecodingFormat)keyDecodingFormat.orElse((Object)null),?valueDecodingFormat,?keyProjection,?valueProjection,?keyPrefix,?KafkaOptions.getSourceTopics(tableOptions),?KafkaOptions.getSourceTopicPattern(tableOptions),?properties,?startupOptions.startupMode,?startupOptions.specificOffsets,?startupOptions.startupTimestampMillis); }getValueDecodingFormat 方法最終會調用 discoverOptionalFormatFactory 方法
discoverOptionalDecodingFormat 和 discoverOptionalFormatFactory 源碼
public?<I,?F?extends?DecodingFormatFactory<I>>Optional<DecodingFormat<I>>?discoverOptionalDecodingFormat(Class<F>?formatFactoryClass,?ConfigOption<String>?formatOption)?{return?discoverOptionalFormatFactory(formatFactoryClass,?formatOption).map(formatFactory?->?{String?formatPrefix?=?formatPrefix(formatFactory,?formatOption);try?{return?formatFactory.createDecodingFormat(context,?projectOptions(formatPrefix));}?catch?(Throwable?t)?{throw?new?ValidationException(String.format("Error?creating?scan?format?'%s'?in?option?space?'%s'.",formatFactory.factoryIdentifier(),formatPrefix),t);}});}private?<F?extends?Factory>?Optional<F>?discoverOptionalFormatFactory(Class<F>?formatFactoryClass,?ConfigOption<String>?formatOption)?{final?String?identifier?=?allOptions.get(formatOption);if?(identifier?==?null)?{return?Optional.empty();}final?F?factory?=discoverFactory(context.getClassLoader(),?formatFactoryClass,?identifier);String?formatPrefix?=?formatPrefix(factory,?formatOption);//?log?all?used?options?of?other?factoriesconsumedOptionKeys.addAll(factory.requiredOptions().stream().map(ConfigOption::key).map(k?->?formatPrefix?+?k).collect(Collectors.toSet()));consumedOptionKeys.addAll(factory.optionalOptions().stream().map(ConfigOption::key).map(k?->?formatPrefix?+?k).collect(Collectors.toSet()));return?Optional.of(factory); }//?直接過濾出滿足條件的?format? public?static?<T?extends?Factory>?T?discoverFactory(ClassLoader?classLoader,?Class<T>?factoryClass,?String?factoryIdentifier)?{final?List<Factory>?factories?=?discoverFactories(classLoader);final?List<Factory>?foundFactories?=factories.stream().filter(f?->?factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());if?(foundFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factories?that?implement?'%s'?in?the?classpath.",factoryClass.getName()));}final?List<Factory>?matchingFactories?=foundFactories.stream().filter(f?->?f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());if?(matchingFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factory?for?identifier?'%s'?that?implements?'%s'?in?the?classpath.\n\n"+?"Available?factory?identifiers?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),foundFactories.stream().map(Factory::factoryIdentifier).distinct().sorted().collect(Collectors.joining("\n"))));}if?(matchingFactories.size()?>?1)?{throw?new?ValidationException(String.format("Multiple?factories?for?identifier?'%s'?that?implement?'%s'?found?in?the?classpath.\n\n"+?"Ambiguous?factory?classes?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),matchingFactories.stream().map(f?->?f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));}return?(T)?matchingFactories.get(0);}這里的邏輯和上面加載 connector 的邏輯是一樣的,同樣通過 SPI 先加載所有的 format 然后根據 factoryIdentifier 過濾出滿足條件的 format 這里其實就是 json 了. 返回 formatFactory 后開始創建 format 這個時候就會走到 JsonFormatFactory#createDecodingFormat 這個方法里面.真正的創建一個 DecodingFormat 對象.
createDecodingFormat 源碼
@Overridepublic?DecodingFormat<DeserializationSchema<RowData>>?createDecodingFormat(DynamicTableFactory.Context?context,?ReadableConfig?formatOptions)?{//?驗證相關的參數FactoryUtil.validateFactoryOptions(this,?formatOptions);//?驗證?json.fail-on-missing-field?和?json.ignore-parse-errorsvalidateDecodingFormatOptions(formatOptions);//?獲取?json.fail-on-missing-field?和?json.ignore-parse-errorsfinal?boolean?failOnMissingField?=?formatOptions.get(FAIL_ON_MISSING_FIELD);final?boolean?ignoreParseErrors?=?formatOptions.get(IGNORE_PARSE_ERRORS);//?獲取?timestamp-format.standardTimestampFormat?timestampOption?=?JsonOptions.getTimestampFormat(formatOptions);return?new?DecodingFormat<DeserializationSchema<RowData>>()?{@Overridepublic?DeserializationSchema<RowData>?createRuntimeDecoder(DynamicTableSource.Context?context,?DataType?producedDataType)?{final?RowType?rowType?=?(RowType)?producedDataType.getLogicalType();final?TypeInformation<RowData>?rowDataTypeInfo?=context.createTypeInformation(producedDataType);return?new?JsonRowDataDeserializationSchema(rowType,rowDataTypeInfo,failOnMissingField,ignoreParseErrors,timestampOption);}@Overridepublic?ChangelogMode?getChangelogMode()?{return?ChangelogMode.insertOnly();}};}這里的邏輯也非常簡單,首先會對 format 相關的參數進行驗證, 然后驗證 json.fail-on-missing-field 和 json.ignore-parse-errors 這兩個參數.之后就開始創建 JsonRowDataDeserializationSchema 對象
JsonRowDataDeserializationSchema 源碼
public?JsonRowDataDeserializationSchema(RowType?rowType,TypeInformation<RowData>?resultTypeInfo,boolean?failOnMissingField,boolean?ignoreParseErrors,TimestampFormat?timestampFormat)?{if?(ignoreParseErrors?&&?failOnMissingField)?{throw?new?IllegalArgumentException("JSON?format?doesn't?support?failOnMissingField?and?ignoreParseErrors?are?both?enabled.");}this.resultTypeInfo?=?checkNotNull(resultTypeInfo);this.failOnMissingField?=?failOnMissingField;this.ignoreParseErrors?=?ignoreParseErrors;this.runtimeConverter?=new?JsonToRowDataConverters(failOnMissingField,?ignoreParseErrors,?timestampFormat).createConverter(checkNotNull(rowType));this.timestampFormat?=?timestampFormat;boolean?hasDecimalType?=LogicalTypeChecks.hasNested(rowType,?t?->?t?instanceof?DecimalType);if?(hasDecimalType)?{objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);}objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),?true); }在構造方法里面最重要的是創建 JsonToRowDataConverter 對象,這里面方法的調用比較多,這里只重要的方法進行說明
createRowConverter 源碼
public?JsonToRowDataConverter?createRowConverter(RowType?rowType)?{final?JsonToRowDataConverter[]?fieldConverters?=rowType.getFields().stream().map(RowType.RowField::getType).map(this::createConverter).toArray(JsonToRowDataConverter[]::new);final?String[]?fieldNames?=?rowType.getFieldNames().toArray(new?String[0]);return?jsonNode?->?{ObjectNode?node?=?(ObjectNode)?jsonNode;int?arity?=?fieldNames.length;GenericRowData?row?=?new?GenericRowData(arity);for?(int?i?=?0;?i?<?arity;?i++)?{String?fieldName?=?fieldNames[i];JsonNode?field?=?node.get(fieldName);Object?convertedField?=?convertField(fieldConverters[i],?fieldName,?field);row.setField(i,?convertedField);}return?row;}; }因為是 JSON 格式的數據,所以是一個 ROW 類型,所以要先創建 JsonToRowDataConverter 對象,然后在這里會對每一個字段創建一個 fieldConverter 根據你在 DDL 里面定義的字段類型走不同的轉換方法,比如 String 類型的數據會調用 convertToString 方法
convertToString 源碼
private?StringData?convertToString(JsonNode?jsonNode)?{if?(jsonNode.isContainerNode())?{return?StringData.fromString(jsonNode.toString());}?else?{return?StringData.fromString(jsonNode.asText());} }這里需要注意的是 string 類型的數據需要返回 StringData 類型不然會報類型轉換異常的錯.感興趣的朋友可以看下其他類型是如何處理的.
到這里 JsonRowDataDeserializationSchema 對象就構造完成了.那后面其實就是優化,轉換到翻譯成 streamGraph 再后面的過程就和 datastream api 開發的任務一樣了.
然后真正開始消費數據的時候,會走到 JsonRowDataDeserializationSchema#deserialize 方法對數據進行反序列化.
deserialize 源碼
@Override public?RowData?deserialize(@Nullable?byte[]?message)?throws?IOException?{if?(message?==?null)?{return?null;}try?{return?convertToRowData(deserializeToJsonNode(message));}?catch?(Throwable?t)?{if?(ignoreParseErrors)?{return?null;}throw?new?IOException(format("Failed?to?deserialize?JSON?'%s'.",?new?String(message)),?t);} }先會把數據反序列成 JsonNode 對象.
deserializeToJsonNode 源碼
public?JsonNode?deserializeToJsonNode(byte[]?message)?throws?IOException?{return?objectMapper.readTree(message); }可以看到 Flink 的內部是用 jackson 解析數據的.接著把 jsonNode 格式的數據轉換成 RowData 格式的數據
convertToRowData 源碼
public?RowData?convertToRowData(JsonNode?message)?{return?(RowData)?runtimeConverter.convert(message); }然后這里的調用其實和上面構造 JsonRowDataDeserializationSchema 的時候是一樣的
return?jsonNode?->?{ObjectNode?node?=?(ObjectNode)?jsonNode;int?arity?=?fieldNames.length;GenericRowData?row?=?new?GenericRowData(arity);for?(int?i?=?0;?i?<?arity;?i++)?{String?fieldName?=?fieldNames[i];JsonNode?field?=?node.get(fieldName);Object?convertedField?=?convertField(fieldConverters[i],?fieldName,?field);row.setField(i,?convertedField);}return?row; };最終返回的是 GenericRowData 類型的數據,其實就是 RowData 類型的,因為是 RowData 的實現類.然后就會把反序列后的數據發送到下游了.
總結
這篇文章主要分析了 Flink SQL JSON Format 的相關源碼,從構建 JsonRowDataDeserializationSchema 到反序列化數據 deserialize.因為篇幅原因,只展示每個環節最重要的代碼,其實很多細節都直接跳過了.感興趣的朋友也可以自己去調試一下代碼.有時間的話會更新更多的實現細節.
推薦閱讀
Flink 任務實時監控最佳實踐
Flink on yarn 實時日志收集最佳實踐
Flink 1.14.0 全新的 Kafka Connector
Flink 1.14.0 消費 kafka 數據自定義反序列化類
如果你覺得文章對你有幫助,麻煩點一下贊和在看吧,你的支持是我創作的最大動力.
總結
以上是生活随笔為你收集整理的Flink SQL JSON Format 源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于20年吉林电赛D题硬件电路的分享
- 下一篇: ABAC相关标准在数据服务中的应用——X