flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台
背景
zeppelin不提供per job模式
實時平臺開發(fā)周期長
基于zeppelin開發(fā)一個簡易實時平臺
開發(fā)zeppelin Interpreter
提交sql任務(wù)
提交jar任務(wù)
背景
隨著flink的蓬勃發(fā)展,zeppelin社區(qū)也大力推進flink與zeppelin的集成.zeppelin的定位是一種使用sql或者scala等語言的一個交互式的分析查詢分析工具。
Web-based?notebook?that?enables?data-driven,interactive?data?analytics?and?collaborative?documents?with?SQL,?Scala?and?more.
所以zeppelin與flink或者是其他的解釋器集成的時候,就會有這么一個架構(gòu)的特點,我需要啟動一個處理數(shù)據(jù)的服務(wù),相關(guān)的任務(wù)都提交到這個上面,拿flink來說,就是需要啟動一個flink的集群,比如local、remote、session模式的集群。當我們執(zhí)行一些flink sql的時候,都是提交到這個集群來執(zhí)行的。
zeppelin不提供per job模式
但是我們在生產(chǎn)環(huán)境中,對于一些flink的流式任務(wù),我們一般會采用per job的模式提交任務(wù),主要是為了任務(wù)資源的隔離,每個任務(wù)互不影響。目前zeppelin是不支持這種模式的。所以很多公司都會開發(fā)一個自己的實時流式任務(wù)計算平臺,可以實現(xiàn)使用sql或者jar的方式通過平臺來提交任務(wù)到集群,避免了底層一些復(fù)雜的操作,使一些只會sql的人也能開發(fā)flink任務(wù)。
實時平臺開發(fā)周期長
但是開發(fā)一個實時計算平臺其實是相對比較復(fù)雜的,它需要有前端的寫sql的頁面,后端的提交邏輯,以及前后端的交互等等。所以我的想法是既然zeppelin已經(jīng)提供了我們做一個實時平臺的很多的功能,比如寫sql的頁面、前后端交互、提交任務(wù)、獲取任務(wù)的狀態(tài)等等,那么我們是不是可以用zeppelin來開發(fā)一個簡化版的實時計算平臺呢。
基于zeppelin開發(fā)一個簡易實時平臺
今天我們談?wù)勗趺赐ㄟ^zeppelin來實現(xiàn)一個簡易的實時平臺,目的是可以把flink的sql和jar的流式任務(wù)以per job的方式提交到y(tǒng)arn集群。
我們簡單的看下zeppelin中flink 解釋器的源碼,他底層是使用了flink scala shell,具體相關(guān)內(nèi)容可以參考 Flink Scala REPL :https://ci.apache.org/projects/flink/flink-docs-stable/ops/scala_shell.html.
zeppelin在提交flink的任務(wù)的時候,會判斷下集群是否啟動,如果沒有啟動flink集群,會根據(jù)設(shè)置的模式(local、yarn)先啟動一個非隔離模式的flink集群(remote模式需要提前啟動好一個集群),然后客戶端保持著和服務(wù)器的連接,后續(xù)有用戶提交的任務(wù),就把任務(wù)提交到剛起啟動的集群。我研究了一下代碼覺得在這個上面加一個per job模式的話可能會破壞原來的架構(gòu),改動還會比較大,所以后來想自己做一個zepplin的解釋器,功能就是通過sql或者jar的方式專門用來提交flink的流式任務(wù)。
開發(fā)zeppelin Interpreter
具體zeppelin的Interpreter的開發(fā)可以參考這篇文章。
https://zeppelin.apache.org/docs/0.9.0-preview1/development/writing_zeppelin_interpreter.html
核心的代碼就是繼承抽象類Interpreter,實現(xiàn)其中的幾個方法,我們簡單來講講。
public?abstract?class?Interpreter?{????
??/**
??*?初始化的時候調(diào)用,可以在這個里面加一些系統(tǒng)初始化的工作,這個方法只調(diào)用一次。
??*?寫過flink自定義source和sink的同學應(yīng)該不會陌生。
???*/
??@ZeppelinApi
??public?abstract?void?open()?throws?InterpreterException;
??/**
???*?
???*?釋放Interpreter資源,也只會被調(diào)用一次。
???*/
??@ZeppelinApi
??public?abstract?void?close()?throws?InterpreterException;
????
????/**
???*?異步的運行輸入框里面的代碼并返回結(jié)果。.
???*
???*?@param?st?就是頁面那個框里你輸入的東西
???*/
??@ZeppelinApi
??public?abstract?InterpreterResult?interpret(String?st,
??????????????????????????????????????????????InterpreterContext?context)
??????throws?InterpreterException;????
????
}
除了上面列出來的這幾個,還有其他的幾個,我這里就不羅列代碼了,大家有興趣的可以自己看下。
底層我使用的是flink application模式來提交的任務(wù),在open里面做一些提交flink初始化的工作,比如構(gòu)造配置文件,啟動yarnClient等等。在interpret方法解析內(nèi)容,執(zhí)行提交任務(wù)的工作。
最終我們實現(xiàn)了可以通過jar包和sql的方式來提交任務(wù)到y(tǒng)arn集群。
提交sql任務(wù)
我們可以指定一些任務(wù)的參數(shù),比如jobname,并行度、checkpoint間隔等等,頁面大概長這個樣子,提交任務(wù)之后,可以在yarn集群看到相關(guān)的任務(wù)。
在這里插入圖片描述提交jar任務(wù)
首先把相應(yīng)的jar上傳到hdfs相關(guān)路徑,然后提交任務(wù)之前,指定jar的路徑,以及jobname、并行度等等,正文就不需要寫什么了,然后把這個任務(wù)提交到y(tǒng)arn集群。
在這里插入圖片描述目前只是實現(xiàn)了一些核心的功能,還有一些其他的功能需要后續(xù)完善。
更多內(nèi)容,歡迎關(guān)注我的公眾號【大數(shù)據(jù)技術(shù)與應(yīng)用實戰(zhàn)】
image總結(jié)
以上是生活随笔為你收集整理的flink sql udf jar包_flink教程flink 1.11 集成zeppelin实现简易实时计算平台的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docker删除镜像命令_第三章 Doc
- 下一篇: 代码编程教学_少儿编程教学环境开发之代码