2021年大数据Flink(四十八):扩展阅读 Streaming File Sink
目錄
擴展閱讀? Streaming File Sink
介紹
場景描述
Bucket和SubTask、PartFile
案例演示
擴展閱讀? 配置詳解
PartFile
PartFile序列化編碼
桶分配策略
滾動策略? ?
擴展閱讀? Streaming File Sink
介紹
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html
場景描述
StreamingFileSink是Flink1.7中推出的新特性,是為了解決如下的問題:
大數據業務場景中,經常有一種場景:外部數據發送到kafka中,flink作為中間件消費kafka數據并進行業務處理;處理完成之后的數據可能還需要寫入到數據庫或者文件系統中,比如寫入hdfs中。
StreamingFileSink就可以用來將分區文件寫入到支持?Flink?FileSystem?接口的文件系統中,支持Exactly-Once語義。
這種sink實現的Exactly-Once都是基于Flink checkpoint來實現的兩階段提交模式來保證的,主要應用在實時數倉、topic拆分、基于小時分析處理等場景下。
Bucket和SubTask、PartFile
- Bucket
StreamingFileSink可向由Flink FileSystem抽象支持的文件系統寫入分區文件(因為是流式寫入,數據被視為無界)。該分區行為可配,默認按時間,具體來說每小時寫入一個Bucket,該Bucket包括若干文件,內容是這一小時間隔內流中收到的所有record。
- PartFile
每個Bukcket內部分為多個PartFile來存儲輸出數據,該Bucket生命周期內接收到數據的sink的每個子任務至少有一個PartFile。
而額外文件滾動由可配的滾動策略決定,默認策略是根據文件大小和打開超時(文件可以被打開的最大持續時間)以及文件最大不活動超時等決定是否滾動。
Bucket和SubTask、PartFile關系如圖所示
?
?
案例演示
- 需求
編寫Flink程序,接收socket的字符串數據,然后將接收到的數據流式方式存儲到hdfs
- 開發步驟
- 初始化流計算運行環境
- 設置Checkpoint(10s)周期性啟動
- 指定并行度為1
- 接入socket數據源,獲取數據
- 指定文件編碼格式為行編碼格式
- 設置桶分配策略
- 設置文件滾動策略
- 指定文件輸出配置
- 將streamingfilesink對象添加到環境
- 執行任務
- 實現代碼
package cn.lanson.extend;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.util.concurrent.TimeUnit;public class StreamFileSinkDemo {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));env.setStateBackend(new FsStateBackend("file:///D:/ckp"));//2.sourceDataStreamSource<String> lines = env.socketTextStream("node1", 9999);//3.sink//設置sink的前綴和后綴//文件的頭和文件擴展名//prefix-xxx-.txtOutputFileConfig config = OutputFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".txt").build();//設置sink的路徑String outputPath = "hdfs://node1:8020/FlinkStreamFileSink/parquet";//創建StreamingFileSinkfinal StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(outputPath),new SimpleStringEncoder<String>("UTF-8"))/*** 設置桶分配政策* DateTimeBucketAssigner --默認的桶分配政策,默認基于時間的分配器,每小時產生一個桶,格式如下yyyy-MM-dd--HH* BasePathBucketAssigner :將所有部分文件(part file)存儲在基本路徑中的分配器(單個全局桶)*/.withBucketAssigner(new DateTimeBucketAssigner<>())/*** 有三種滾動政策* ?CheckpointRollingPolicy* ?DefaultRollingPolicy* ?OnCheckpointRollingPolicy*/.withRollingPolicy(/*** 滾動策略決定了寫出文件的狀態變化過程* 1. In-progress :當前文件正在寫入中* 2. Pending :當處于 In-progress 狀態的文件關閉(closed)了,就變為 Pending 狀態* 3. Finished :在成功的 Checkpoint 后,Pending 狀態將變為 Finished 狀態** 觀察到的現象* 1.會根據本地時間和時區,先創建桶目錄* 2.文件名稱規則:part-<subtaskIndex>-<partFileIndex>* 3.在macos中默認不顯示隱藏文件,需要顯示隱藏文件才能看到處于In-progress和Pending狀態的文件,因為文件是按照.開頭命名的**/DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) //設置滾動間隔.withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) //設置不活動時間間隔.withMaxPartSize(1024 * 1024 * 1024) // 最大尺寸.build()).withOutputFileConfig(config).build();lines.addSink(sink).setParallelism(1);env.execute();}
}
?
擴展閱讀? 配置詳解
PartFile
前面提到過,每個Bukcket內部分為多個部分文件,該Bucket內接收到數據的sink的每個子任務至少有一個PartFile。而額外文件滾動由可配的滾動策略決定。
關于順序性
對于任何給定的Flink子任務,PartFile索引都嚴格增加(按創建順序),但是,這些索引并不總是順序的。當作業重新啟動時,所有子任務的下一個PartFile索引將是max PartFile索引+ 1,其中max是指在所有子任務中對所有計算的索引最大值。
return?new?Path(bucketPath,?outputFileConfig.getPartPrefix()?+?'-'?+?subtaskIndex +?'-'?+?partCounter +?outputFileConfig.getPartSuffix());
?
- PartFile生命周期
輸出文件的命名規則和生命周期。由上圖可知,部分文件(part file)可以處于以下三種狀態之一:
1. In-progress :
當前文件正在寫入中
2. Pending :
當處于 In-progress 狀態的文件關閉(closed)了,就變為 Pending 狀態
3. Finished :
在成功的 Checkpoint 后,Pending 狀態將變為 Finished 狀態,處于 Finished 狀態的文件不會再被修改,可以被下游系統安全地讀取。
注意:?
使用 StreamingFileSink 時需要啟用 Checkpoint ,每次做 Checkpoint 時寫入完成。如果 Checkpoint 被禁用,部分文件(part file)將永遠處于 'in-progress' 或 'pending' 狀態,下游系統無法安全地讀取。
?
- PartFile的生成規則
在每個活躍的Bucket期間,每個Writer的子任務在任何時候都只會有一個單獨的In-progress PartFile,但可有多個Peding和Finished狀態文件。
一個Sink的兩個Subtask的PartFile分布情況實例如下:
初始狀態,兩個inprogress文件正在被兩個subtask分別寫入
└──?2021-05-17--12
????├──?part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
????└──?part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
當part-1-0因文件大小超過閾值等原因發生滾動時,變為Pending狀態等待完成,但此時不會被重命名。注意此時Sink會創建一個新的PartFile即part-1-1:
└──?2020-05-17--12
????├──?part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
????├──?part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
????└──?part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
待下次checkpoint成功后,part-1-0完成變為Finished狀態,被重命名:
└──?2021-05-17--12
????├──?part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
????├──?part-1-0
????└──?part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
下一個Bucket周期到了,創建新的Bucket目錄,不影響之前Bucket內的的in-progress文件,依然要等待文件RollingPolicy以及checkpoint來改變狀態:
└──?2021-05-17--12
????├──?part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
????├──?part-1-0
????└──?part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
└──?2021-05-17--13
????└──?part-0-2.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1
?
- PartFile命名設置
默認,PartFile命名規則如下:
- In-progress / Pending
part--.inprogress.uid - Finished
part--
比如part-1-20表示1號子任務已完成的20號文件。
可以使用OutputFileConfig來改變前綴和后綴,代碼示例如下:
OutputFileConfig config =?OutputFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".ext").build()StreamingFileSink sink =?StreamingFileSink.forRowFormat(new?Path(outputPath),?new?SimpleStringEncoder<String>("UTF-8")).withBucketAssigner(new?KeyBucketAssigner()).withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(config).build()
得到的PartFile示例如下
└──?2021-05-17--12
????├──?prefix-0-0.ext
????├──?prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
????├──?prefix-1-0.ext
????└──?prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
?
PartFile序列化編碼
StreamingFileSink 支持行編碼格式和批量編碼格式,比如 Apache Parquet 。這兩種變體可以使用以下靜態方法創建:
?
- Row-encoded sink:?
StreamingFileSink.forRowFormat(basePath, rowEncoder)
//行
StreamingFileSink.forRowFormat(new Path(path), new SimpleStringEncoder<T>()).withBucketAssigner(new PaulAssigner<>()) //分桶策略.withRollingPolicy(new PaulRollingPolicy<>()) //滾動策略.withBucketCheckInterval(CHECK_INTERVAL) //檢查周期.build();
StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)Bulk-encoded sink:
?
//列 parquet
StreamingFileSink.forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(clazz)).withBucketAssigner(new PaulBucketAssigner<>()).withBucketCheckInterval(CHECK_INTERVAL).build();
這兩種寫入格式除了文件格式的不同,另外一個很重要的區別就是回滾策略的不同:創建行或批量編碼的 Sink 時,我們需要指定存儲桶的基本路徑和數據的編碼
- forRowFormat行寫可基于文件大小、滾動時間、不活躍時間進行滾動,
- forBulkFormat列寫方式只能基于checkpoint機制進行文件滾動,即在執行snapshotState方法時滾動文件,如果基于大小或者時間滾動文件,那么在任務失敗恢復時就必須對處于in-processing狀態的文件按照指定的offset進行truncate,由于列式存儲是無法針對文件offset進行truncate的,因此就必須在每次checkpoint使文件滾動,其使用的滾動策略實現是OnCheckpointRollingPolicy。
forBulkFormat只能和 `OnCheckpointRollingPolicy` 結合使用,每次做 checkpoint 時滾動文件。
?
- Row Encoding
此時,StreamingFileSink會以每條記錄為單位進行編碼和序列化。
必須配置項:
- 輸出數據的BasePath
- 序列化每行數據寫入PartFile的Encoder
使用RowFormatBuilder可選配置項:
- 自定義RollingPolicy
默認使用DefaultRollingPolicy來滾動文件,可自定義
- bucketCheckInterval
默認1分鐘。該值單位為毫秒,指定按時間滾動文件間隔時間
例子如下:
import?org.apache.flink.api.common.serialization.SimpleStringEncoderimport?org.apache.flink.core.fs.Pathimport?org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink// 1. 構建DataStreamDataStream input ?=?...// 2. 構建StreamingFileSink,指定BasePath、Encoder、RollingPolicyStreamingFileSink sink ?=?StreamingFileSink.forRowFormat(new?Path(outputPath),?new?SimpleStringEncoder[String]("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15)).withInactivityInterval(TimeUnit.MINUTES.toMillis(5)).withMaxPartSize(1024?*?1024?*?1024).build()).build()// 3. 添加Sink到InputDataSteam即可input.addSink(sink)
以上例子構建了一個簡單的擁有默認Bucket構建行為(繼承自BucketAssigner的DateTimeBucketAssigner)的StreamingFileSink,每小時構建一個Bucket,內部使用繼承自RollingPolicy的DefaultRollingPolicy,以下三種情況任一發生會滾動PartFile:
- PartFile包含至少15分鐘的數據
- 在過去5分鐘內沒有接收到新數據
- 在最后一條記錄寫入后,文件大小已經達到1GB
除了使用DefaultRollingPolicy,也可以自己實現RollingPolicy接口來實現自定義滾動策略。
?
- Bulk Encoding
要使用批量編碼,請將StreamingFileSink.forRowFormat()替換為StreamingFileSink.forBulkFormat(),注意此時必須指定一個BulkWriter.Factory而不是行模式的Encoder。BulkWriter在邏輯上定義了如何添加、fllush新記錄以及如何最終確定記錄的bulk以用于進一步編碼。
需要注意的是,使用Bulk Encoding時,Filnk1.9版本的文件滾動就只能使用OnCheckpointRollingPolicy的策略,該策略在每次checkpoint時滾動part-file。
?
Flink有三個內嵌的BulkWriter:
- ParquetAvroWriters
有一些靜態方法來創建ParquetWriterFactory。
- SequenceFileWriterFactory
- CompressWriterFactory
Flink有內置方法可用于為Avro數據創建Parquet writer factory。
要使用ParquetBulkEncoder,需要添加以下Maven依賴:
<!--?streaming?File?Sink所需要的jar包--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet_2.12</artifactId><version>1.12.0</version></dependency><!--?https://mvnrepository.com/artifact/org.apache.avro/avro?--><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.12.0</version></dependency>
?
桶分配策略
桶分配策略定義了將數據結構化后寫入基本輸出目錄中的子目錄,行格式和批量格式都需要使用。
具體來說,StreamingFileSink使用BucketAssigner來確定每條輸入的數據應該被放入哪個Bucket,
默認情況下,DateTimeBucketAssigner 基于系統默認時區每小時創建一個桶:
格式如下:yyyy-MM-dd--HH。日期格式(即桶的大小)和時區都可以手動配置。
我們可以在格式構建器上調用 .withBucketAssigner(assigner)?來自定義 BucketAssigner。
Flink 有兩個內置的 BucketAssigners :
- DateTimeBucketAssigner:默認基于時間的分配器
- BasePathBucketAssigner:將所有部分文件(part file)存儲在基本路徑中的分配器(單個全局桶)
???????
- ???????DateTimeBucketAssigner
Row格式和Bulk格式編碼都使用DateTimeBucketAssigner作為默認BucketAssigner。 默認情況下,DateTimeBucketAssigner 基于系統默認時區每小時以格式yyyy-MM-dd--HH來創建一個Bucket,Bucket路徑為/{basePath}/{dateTimePath}/。
- basePath是指StreamingFileSink.forRowFormat(new Path(outputPath)時的路徑
- dateTimePath中的日期格式和時區都可在初始化DateTimeBucketAssigner時配置
public?class?DateTimeBucketAssigner<IN>?implements?BucketAssigner<IN,?String>?{private?static?final?long?serialVersionUID =?1L;// 默認的時間格式字符串private?static?final?String DEFAULT_FORMAT_STRING =?"yyyy-MM-dd--HH";// 時間格式字符串private?final?String formatString;// 時區private?final?ZoneId zoneId;// DateTimeFormatter被用來通過當前系統時間和DateTimeFormat來生成時間字符串private?transient?DateTimeFormatter dateTimeFormatter;/*** 使用默認的`yyyy-MM-dd--HH`和系統時區構建DateTimeBucketAssigner*/public?DateTimeBucketAssigner()?{this(DEFAULT_FORMAT_STRING);}/*** 通過能被SimpleDateFormat解析的時間字符串和系統時區* 來構建DateTimeBucketAssigner*/public?DateTimeBucketAssigner(String formatString)?{this(formatString,?ZoneId.systemDefault());}/*** 通過默認的`yyyy-MM-dd--HH`和指定的時區* 來構建DateTimeBucketAssigner*/public?DateTimeBucketAssigner(ZoneId zoneId)?{this(DEFAULT_FORMAT_STRING,?zoneId);}/*** 通過能被SimpleDateFormat解析的時間字符串和指定的時區* 來構建DateTimeBucketAssigner*/public?DateTimeBucketAssigner(String formatString,?ZoneId zoneId)?{this.formatString =?Preconditions.checkNotNull(formatString);this.zoneId =?Preconditions.checkNotNull(zoneId);}/*** 使用指定的時間格式和時區來格式化當前ProcessingTime,以獲取BucketId*/@Overridepublic?String getBucketId(IN element,?BucketAssigner.Context context)?{if?(dateTimeFormatter ==?null)?{dateTimeFormatter =?DateTimeFormatter.ofPattern(formatString).withZone(zoneId);}return?dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));}@Overridepublic?SimpleVersionedSerializer<String>?getSerializer()?{return?SimpleVersionedStringSerializer.INSTANCE;}@Overridepublic?String toString()?{return?"DateTimeBucketAssigner{"?+"formatString='"?+?formatString +?'\''?+", zoneId="?+?zoneId +'}';}}
?
- BasePathBucketAssigner
將所有PartFile存儲在BasePath中(此時只有單個全局Bucket)。
先看看BasePathBucketAssigner的源碼,方便繼續學習DateTimeBucketAssigner:
@PublicEvolving
public class BasePathBucketAssigner<T> implements BucketAssigner<T, String> {private static final long serialVersionUID = -6033643155550226022L;/*** BucketId永遠為"",即Bucket全路徑為用戶指定的BasePath*/@Overridepublic String getBucketId(T element, BucketAssigner.Context context) {return "";}/*** 用SimpleVersionedStringSerializer來序列化BucketId*/@Overridepublic SimpleVersionedSerializer<String> getSerializer() {// in the future this could be optimized as it is the empty string.return SimpleVersionedStringSerializer.INSTANCE;}@Overridepublic String toString() {return "BasePathBucketAssigner";}
}
???????滾動策略? ?
滾動策略 RollingPolicy 定義了指定的文件在何時關閉(closed)并將其變為 Pending 狀態,隨后變為 Finished 狀態。處于 Pending 狀態的文件會在下一次 Checkpoint 時變為 Finished 狀態,通過設置 Checkpoint 間隔時間,可以控制部分文件(part file)對下游讀取者可用的速度、大小和數量。
Flink 有兩個內置的滾動策略:
- DefaultRollingPolicy
- OnCheckpointRollingPolicy
需要注意的是,使用Bulk Encoding時,文件滾動就只能使用OnCheckpointRollingPolicy的策略,該策略在每次checkpoint時滾動part-file。
總結
以上是生活随笔為你收集整理的2021年大数据Flink(四十八):扩展阅读 Streaming File Sink的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(四十六):扩
- 下一篇: 2021年大数据ZooKeeper(一)