Azkaban的Web Server源码探究系列20:resolvebuildFlow
2019獨角獸企業重金招聘Python工程師標準>>>
上節,我們加載了job的描述屬性 + 正確性驗證 !
==============================================================
stop in?azkaban.project.DirectoryFlowLoader.resolveDependencies
==============================================================
這個也很簡單,比如有個Hello.job,World.job
指定了world依賴Hello
則最后在
private?HashMap<String, Map<String, Edge>> nodeDependencies;
這種結構里存的是什么呢?
HashMap<
? ? ? ? String, ---World
? ? ? ? Map<
? ? ? ? ? ? ? String, ---Hello
? ? ? ? ? ? ? ?Edge ---<Hello,World>
? ? ? ? ? ?>
? ? ? ?>
是不是很簡單,沒啥可說的!
接下來是
// Create the flows.
buildFlowsFromDependencies();
到目前為止,我們裝載了點+校驗正確性+邊。有了點和邊,其實就有了圖,有了圖,我們就可以構建Flow了
這也正是我們下面要做的事情!
======================================================================================
細節略
======================================================================================
最后就是生成報告,只要寫的沒問題,都通過!
然后就是數據庫的操作,這是我們需要關心的!
---
synchronized?(project) {
int?newVersion?= projectLoader.getLatestProjectVersion(project) + 1;
Map<String, Flow> flows?= loader.getFlowMap();
for?(Flow flow?: flows.values()) {
flow.setProjectId(project.getId());
flow.setVersion(newVersion);
}
?
logger.info("Uploading file to db "?+ archive.getName());
projectLoader.uploadProjectFile(project, newVersion, fileType, archive.getName(), archive,
uploader.getUserId());
logger.info("Uploading flow to db "?+ archive.getName());
projectLoader.uploadFlows(project, newVersion, flows.values());
logger.info("Changing project versions "?+ archive.getName());
projectLoader.changeProjectVersion(project, newVersion, uploader.getUserId());
project.setFlows(flows);
logger.info("Uploading Job properties");
projectLoader.uploadProjectProperties(project, new?ArrayList<Props>(jobProps.values()));
logger.info("Uploading Props properties");
projectLoader.uploadProjectProperties(project, propProps);
}
?
logger.info("Uploaded project files. Cleaning up temp files.");
projectLoader.postEvent(project, EventType.UPLOADED, uploader.getUserId(),
"Uploaded project files zip "?+ archive.getName());
try?{
FileUtils.deleteDirectory(file);
} catch?(IOException e) {
file.deleteOnExit();
e.printStackTrace();
}
?
logger.info("Cleaning up old install files older than "?+ (project.getVersion() - projectVersionRetention));
projectLoader.cleanOlderProjectVersion(project.getId(), project.getVersion() - projectVersionRetention);
?
return?reports;
}
首先是查詢出最大版本號,+1最為最新值。
---
1)上傳文件,分段,10M為1個單位,相關表---Project-files
2)?project_versions
3)project_flows :記錄根據file分析出來的拓撲圖信息
比如一個可能的拓撲圖如下:
?json = "{"metadata":{},"project.id":9,"nodes":[{"layout":{"level":0},"propSource":null,"jobSource":"Hello.job","expectedRuntime":1,"id":"Hello","jobType":"command"},{"layout":{"level":1},"propSource":null,"jobSource":"World.job","expectedRuntime":1,"id":"World","jobType":"command"}],"edges":[{"source":"Hello","target":"World"}],"failure.email":[],"success.email":[],"id":"World","type":"flow","version":11,"mailCreator":"default","props":[],"layedout":false}"
轉載于:https://my.oschina.net/qiangzigege/blog/655836
總結
以上是生活随笔為你收集整理的Azkaban的Web Server源码探究系列20:resolvebuildFlow的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql语句:索引,游标,存储过程,视
- 下一篇: JSONP - 跨域AJAX