Schedulerx2.0工作流支持数据传输
1. 前言
Schedulerx2.0是阿里中間件自研的基于akka架構的新一代分布式任務調度平臺,提供定時、任務編排、分布式跑批等功能,具有高可靠、海量任務、秒級調度等能力。
Schedulerx2.0提供可視化的工作流進行任務編排,該文章將詳細介紹如何使用schedulerx2.0的工作流進行上下游任務的數據傳輸。
2. 接口介紹
2.1 支持的執行方式和任務類型
當前只有java任務支持數據傳輸,網格計算請使用MapReduce模型進行數據傳輸。
2.2 返回執行結果
/**** @param status* @param result, the size should less than 1000 bytes* @throws Exception*/ public ProcessResult(boolean status, String result) throws Exception;在Processor結尾,通過該結果替代ProcessResult(boolean status),可以返回執行結果。
result的長度不能超過1000個字節(注意,不是String的長度,如果有中文字符,可能會超過1000個字節!),如果超過1000個字節,任務會失敗。
2.3 獲取上游數據
List<JobInstanceData> upstreamDatas = JobContext.getUpstreamData();在Processor里,可以通過該接口從JobContext中拿到上游的數據。上游的數據是一個list(可能有多個父節點),JobInstanceData里有兩個屬性,分別是jobName和data(String類型)。
3. Demo演示
首先我們寫三個jobProcessor
public class TestSimpleJobA extends JavaProcessor {@Overridepublic ProcessResult process(JobContext context) throws Exception {System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));return new ProcessResult(true, String.valueOf(1));} } public class TestSimpleJobB extends JavaProcessor {@Overridepublic ProcessResult process(JobContext context) throws Exception {System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));return new ProcessResult(true, String.valueOf(2));} } public class TestSimpleJobC extends JavaProcessor {@Overridepublic ProcessResult process(JobContext context) throws Exception {List<JobInstanceData> upstreamDatas = context.getUpstreamData();int sum = 0;for (JobInstanceData jobInstanceData : upstreamDatas) {System.out.println("jobName=" + jobInstanceData.getJobName() + ", data=" + jobInstanceData.getData());sum += Integer.valueOf(jobInstanceData.getData());}System.out.println("TestSimpleJobC sum=" + sum);return new ProcessResult(true, String.valueOf(sum));}}通過控制臺配置工作流如下圖所示
觸發一次該工作流,然后進入工作流實例圖,右鍵jobA的實例,進入詳情,可以看到jobA實例結果=1,如下圖
同理,可以看到jobB的實例結果=2, jobC的實例結果=3
控制臺也能看到jobC的機器打印
jobName=jobB, data=2 jobName=jobA, data=1 TestSimpleJobC sum=3
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Schedulerx2.0工作流支持数据传输的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: DLA SQL技巧:行、列转换和JSON
- 下一篇: 避开这2个误区,测试目标 KPI 不再难