js reduce实现中间件_MapReduce 模型
MapReduce 模型是 Map 模型的擴展,新增 Reduce接口,需要實現 MapReduceJobProcessor。
注意事項
MapReduce 模型只有一個 Reduce,所有子任務完成后會執行 Reduce 方法,可以在 Reduce 方法中返回該任務示例的執行結果,作為工作流的上下游數據傳遞。如果有子任務失敗,Reduce
不會執行。Reduce 失敗,整個任務示例也失敗。
Scheduler X 不保證子任務一定執行一次,在特殊條件下會 failover,可能會導致子任務重復執行,需要業務方自己實現冪等。
Scheduler X 使用的是 Hessian 序列化框架,目前不支持 LocalDateTime 和 BigDecimal。子任務中如果有如上兩個數據結構,請替換其他的數據結構(特別是 BigDecimal,序列化不會報錯,反序列化會變成 0)。
接口
接口
解釋
是否必選
ProcessResult process(JobContext context)
每個子任務執行業務的入口,需要從 context 里獲取 taskName,自己判斷是哪個子任務,進行相應的邏輯處理。執行完成后,需要返回 ProcessResult。
是
ProcessResult map(List extends Object> taskList, String taskName)
執行 map 方法可以把一批子任務分布式到多臺機器上執行,可以 map 多次。如果 taskList 是空,返回失敗。執行完成后,需要返回 ProcessResult。
是
void kill(JobContext context)
前端 kill 任務會觸發該方法,需要用戶自己實現如何中斷業務。
否
執行方式
并行計算:最多支持 300 任務,有子任務列表。
注意 秒級別任務不要選擇并行計算。
內存網格:基于內存計算,最多支持 50,000 以下子任務,速度快。
網格計算:基于文件計算,最多支持 1,000,000 子任務。
高級配置
發送 500 條消息的 Demo 示例(適用于 MapReduce 模型)
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum=500;
if (isRootTask(context)) {
System.out.println("start root task");
List msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
System.out.println(task);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}
處理單表數據的 Demo 示例(適用于 Map 或 MapReduce 模型)
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
@Service
private XXXService xxxService;
private final int PAGE_SIZE = 500;
static class PageTask {
private long startId;
private long endId;
public PageTask(long startId, long endId) {
this.startId = startId;
this.endId = endId;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) throws Exception {
String tableName = context.getJobParameters(); //多個 Job 后端代碼可以一致,通過控制臺配置 Job 參數表示表名。
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
Pair idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數量
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List records = queryRecord(tableName, startId, endId);
//TODO handle records
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private Pair queryMinAndMaxId(String tableName) {
//TODO select min(id),max(id) from [tableName]
return new Pair(1L, 10000L);
}
private List queryRecord(String tableName, long startId, long endId) {
List records = Lists.newArrayList();
//TODO select * from [tableName] where id>=[startId] and id
return records;
}
}
處理分庫分表數據的 Demo 示例(適用于 Map 或 MapReduce 模型)
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {
@Service
private XXXService xxxService;
private final int PAGE_SIZE = 500;
static class PageTask {
private String tableName;
private long startId;
private long endId;
public PageTask(String tableName, long startId, long endId) {
this.tableName = tableName;
this.startId = startId;
this.endId = endId;
}
public String getTableName() {
return tableName;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
//先分庫
List dbList = getDbList();
return map(dbList, "DbTask");
} else if (taskName.equals("DbTask")) {
//根據分庫去分表
String dbName = (String)task;
List tableList = getTableList(dbName);
return map(tableList, "TableTask");
} else if (taskName.equals("TableTask")) {
//如果一個分表也很大,再分頁
String tableName = (String)task;
Pair idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數量
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
String tableName = pageTask.getTableName();
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List records = queryRecord(tableName, startId, endId);
//TODO handle records
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private List getDbList() {
List dbList = Lists.newArrayList();
//TODO 返回分庫列表
return dbList;
}
private List getTableList(String dbName) {
List tableList = Lists.newArrayList();
//TODO 返回分表列表
return tableList;
}
private Pair queryMinAndMaxId(String tableName) {
//TODO select min(id),max(id) from [tableName]
return new Pair(1L, 10000L);
}
private List queryRecord(String tableName, long startId, long endId) {
List records = Lists.newArrayList();
//TODO select * from [tableName] where id>=[startId] and id
return records;
}
}
處理 50 條消息并且返回子任務結果由 Reduce 匯總的 Demo 示例(適用于 MapReduce 模型)
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum = 50;
if (context.getJobParameters() != null) {
dispatchNum = Integer.valueOf(context.getJobParameters());
}
if (isRootTask(context)) {
System.out.println("start root task");
List msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
Thread.sleep(2000);
return new ProcessResult(true, task);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
for (Entry result : context.getTaskResults().entrySet()) {
System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
}
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}
總結
以上是生活随笔為你收集整理的js reduce实现中间件_MapReduce 模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 截屏没有了_原来华为手机的截屏方法不止3
- 下一篇: 被踢出sci_心痛啊!全世界历史最长的期