Apache Flink 零基础入门(十二)Flink sink
生活随笔
收集整理的這篇文章主要介紹了
Apache Flink 零基础入门(十二)Flink sink
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
將DataSet中的數(shù)據(jù)Sink到哪里去。使用的是對應(yīng)的OutPutFormat,也可以使用自定義的sink,有可能寫到hbase中,hdfs中。
- writeAsText() / TextOutputFormat ,以String的形式寫入
-
writeAsCsv(...) / CsvOutputFormat,以CSV的方式寫進(jìn)去
-
print() / printToErr() / print(String msg) / printToErr(String msg)以標(biāo)準(zhǔn)輸出
?writeAsText
object DataSetSinkApp {def main(args: Array[String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironmentval data = 1.to(10)val text = environment.fromCollection(data)val filePath = "E:/test"text.writeAsText(filePath)environment.execute("DataSetSinkApp")} }如果E:/test文件或者文件夾存在,將無法執(zhí)行成功。除非增加一個WriteMode.OVERWRITE
text.writeAsText(filePath, WriteMode.OVERWRITE)這樣就在E盤下新建了一個test文件,內(nèi)容是1到10。
那么如何保存到文件夾中?
text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)設(shè)置并行度為2,這樣就存到test文件夾下,兩個文件1和2
默認(rèn)情況下,不設(shè)置并行度,會把結(jié)果寫到一個文件中,如果設(shè)置并行度,那么每一個并行度都對應(yīng)一個輸出。
Java
public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();List<Integer> info = new ArrayList<>();for(int i = 1;i <=10; i++) {info.add(i);}DataSource<Integer> data1 = executionEnvironment.fromCollection(info);String filePath = "E:/test2";data1.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);executionEnvironment.execute("JavaDataSetSinkApp");}?
總結(jié)
以上是生活随笔為你收集整理的Apache Flink 零基础入门(十二)Flink sink的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门(十
- 下一篇: Apache Flink 零基础入门(十