duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)
Flink總共有三種時間語義:Processing time(處理時間)、Event time(事件時間)以及Ingestion time(攝入時間)。關于這些時間語義的具體解釋,可以參考另一篇文章Flink的時間與watermarks詳解。本文主要講解Flink Table API & SQL中基于時間的算子如何定義時間語義。通過本文你可以了解到:
- 時間屬性的簡介
- 處理時間
- 事件時間
時間屬性簡介
Flink TableAPI&SQL中的基于時間的操作(如window),需要指定時間語義,表可以根據指定的時間戳提供一個邏輯時間屬性。
時間屬性是表schama的一部分,當使用DDL創建表時、DataStream轉為表時或者使用TableSource時,會定義時間屬性。一旦時間屬性被定義完成,該時間屬性可以看做是一個字段的引用,從而在基于時間的操作中使用該字段。
時間屬性像一個時間戳,可以被訪問并參與計算,如果一個時間屬性參與計算,那么該時間屬性會被霧化成一個常規的時間戳,常規的時間戳不能與Flink的時間與水位線兼容,不能被基于時間的操作所使用。
Flink TableAPI & SQL所需要的時間屬性可以通過Datastream程序中指定,如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默認// 可以選擇: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);處理時間
基于本地的機器時間,是一種最簡單的時間語義,但是不能保證結果一致性,使用該時間語義不需要提取時間戳和生成水位線。總共有三種方式定義處理時間屬性,具體如下
DDL語句創建表時定義處理時間
處理時間的屬性可以在DDL語句中被定義為一個計算列,需要使用PROCTIME()函數,如下所示:
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 聲明一個額外字段,作為處理時間屬性 ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分鐘的滾動窗口DataStream轉為Table的過程中定義處理時間
在將DataStream轉為表時,在schema定義中可以通過.proctime屬性指定時間屬性,并將其放在其他schema字段的最后面,具體如下:
DataStream<Tuple2<String, String>> stream = ...; // 聲明一個額外邏輯字段作為處理時間屬性 Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));使用TableSource
自定義TableSource并實現DefinedProctimeAttribute 接口,如下:
// 定義個帶有處理時間屬性的table source public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 創建streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 該字段會追加到schema中,作為第三個字段return "user_action_time";} }// 注冊table source tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));事件時間
基于記錄的具體時間戳,即便是存在亂序或者遲到數據也會保證結果的一致性。總共有三種方式定義處理時間屬性,具體如下
DDL語句創建表時定事件時間
事件時間屬性可以通過 WATERMARK語句進行定義,如下:
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 聲明user_action_time作為事件時間屬性,并允許5S的延遲 WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);DataStream轉為Table的過程中定義事件時間
當定義Schema時通過.rowtime屬性指定事件時間屬性,必須在DataStream中指定時間戳與水位線。例如在數據集中,事件時間屬性為event_time,此時Table中的事件時間字段中可以通過’event_time. rowtime‘來指定。
目前Flink支持兩種方式定義EventTime字段,如下:
// 方式1: // 提取timestamp并分配watermarks DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 聲明一個額外邏輯字段作為事件時間屬性 // 在table schema的末尾使用user_action_time.rowtime定義事件時間屬性 // 系統會在TableEnvironment中獲取事件時間屬性 Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");// 方式2:// 從第一個字段提取timestamp并分配watermarks DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 第一個字段已經用來提取時間戳,可以直接使用對應的字段作為事件時間屬性 Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");// 使用:WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));使用TableSource
另外也可以在創建TableSource的時候,實現DefinedRowtimeAttributes接口來定義EventTime字段,在接口中需要實現getRowtimeAttributeDescriptors方法,創建基于EventTime的時間屬性信息。
// 定義帶有rowtime屬性的table source public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};TypeInformation[] types =new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 創建流,基于user_action_time屬性分配水位線DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);return stream;}@Overridepublic List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {// 標記user_action_time字段作為事件時間屬性// 創建user_action_time描述符,用來標識時間屬性字段RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps());List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;} }// register表 tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));小結
本文主要介紹了如何在Flink Table API和SQL中使用時間語義,可以使用兩種時間語義:處理時間和事件時間。分別對每種的時間語義的使用方式進行了詳細解釋。
往期精彩回顧
Flink Table API & SQL編程指南(1)
Flink Table API & SQL編程指南之動態表(2)
總結
以上是生活随笔為你收集整理的duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++PrimerPlus学习——第九章
- 下一篇: maya批量操作mel_MAYA对多个模