java 协程框架_GitHub - yaozhang0105/dactor: Dactor是基于Java的轻量级同步异步统一处理框架,基于协程思想构建...
DActor
Introduction
DActor框架基于協(xié)程思想設(shè)計(jì),可同時(shí)支持同步和異步代碼,簡化在線異步代碼的開發(fā),用同步代碼的思維來開發(fā)異步代碼,兼顧異步代碼的高并發(fā)、無阻塞和同步代碼的易讀性,可維護(hù)性。
最大程度的降低阻塞,提高單個(gè)線程的處理能力,并可有效的降低線程數(shù)。
項(xiàng)目地址
文檔
視圖配置說明:視圖配置說明
SpringBoot Yml 配置說明:Yml配置說明
XML 配置說明:XML配置說明
常用類說明:常用類說明
配置和API說明:配置和API說明
更多文檔詳見:WIKI
QQ交流群
783580303
接入系統(tǒng)
Dpress:基于Dactor構(gòu)建的多域名博客系統(tǒng)
持續(xù)更新中。。歡迎大家自薦
Overview
目前開發(fā)過程中的幾個(gè)常見模型
同步編程
所有步驟都在一個(gè)主線程中完成,調(diào)用一個(gè)方法,等待其響應(yīng)返回。一個(gè)請求占用一個(gè)線程,在有數(shù)據(jù)庫操作、TCP和Http通訊時(shí)因?yàn)橛凶枞闆r,會(huì)導(dǎo)致占用線程占用而無法及時(shí)釋放
,因此在同步交易中引入了線程池概念,提高系統(tǒng)的吞吐量
異步編程
所有步驟都可在不同線程中完成,調(diào)用一個(gè)方法,不等待響應(yīng)既返回,典型交易如NodeJs。
目前市面上的異步框架都比較復(fù)雜,市面的通用解決方案是CallBack和Promise/Deferred模式模式。
設(shè)計(jì)思路
為了保留異步的高性能,簡化異步的開發(fā)模式,同時(shí)使得程序更容易被程序員理解,在性能和代碼可閱讀性中間取得平衡,設(shè)計(jì)了此框架。
處理步驟:將請求封裝為消息,丟入消息隊(duì)列,尋找合適步驟處理消息,上述過程不斷循環(huán),直到所有可用步驟都執(zhí)行完畢。
因?yàn)槭菍ο㈥?duì)列進(jìn)行處理,對于同步交易,處理完畢即可丟入消息隊(duì)列。對于異步交易,等待回調(diào)完畢再丟入消息隊(duì)列。
兩種情況對于框架來說是無差別的。同時(shí)因?yàn)橥ㄟ^異步交易避免了阻塞情況的發(fā)生,所以可在不大幅度提高線程數(shù)的情況下,提高吞吐量,
同時(shí)也可在一定程度避免流量突增的情況發(fā)生。
消息隊(duì)列采用Disruptor的的高性能隊(duì)列RingBuffer。
以Actor協(xié)程并發(fā)模型為基礎(chǔ)設(shè)計(jì)框架。
Features
1、集成Netty
2、集成HttpClient
3、集成HttpServlet
4、支持多層父子結(jié)構(gòu)
5、支持責(zé)任鏈模式
6、J2EE支持json,csv,pdf,xml,html格式輸出
7、J2EE支持?jǐn)?shù)據(jù)流輸出,動(dòng)態(tài)文件下載、動(dòng)態(tài)圖片輸出、跳轉(zhuǎn)和可根據(jù)配置動(dòng)態(tài)輸出
8、同時(shí)支持SpringBoot和Spring
環(huán)境要求
JDK 1.8
Spring FrameWork 5.2.4.RELEASE + 或者 Spring Boot 2.2.7.RELEASE +
Servlet 3.0+(因?yàn)樾枰褂肧ervlet的異步功能)
注意事項(xiàng)
請求的完整邏輯是分散在不同的線程中執(zhí)行的,所以盡量避免使用ThreadLocal
Getting Started
example是J2EE程序,下載后,可直接運(yùn)行,其中集成了若干例子
默認(rèn)使用.do提交相關(guān)交易,但如果是.json將會(huì)返回json數(shù)據(jù)
啟動(dòng)后,在瀏覽器中輸入http://localhost:8080/example/randomTxt2.json
輸出的是json格式的字符串
randomTxt2:只有一級(jí)父子關(guān)系
randomTxt1:有二級(jí)父子關(guān)系
chaintest1:只使用責(zé)任鏈
chaintest2:同時(shí)使用責(zé)任鏈和一級(jí)父子關(guān)系
exceptionTest:子交易拋出錯(cuò)誤,框架對錯(cuò)誤的處理
randomTxt3為beginBeanId為Actor標(biāo)簽的BeanId例子
httptest演示的是通過httpclient異步方式訪問百度網(wǎng)站
訪問URL:http://localhost:8080/example/ httptest.do
http://localhost:8080/example/np.randomTxt2.json為使用命名空間的例子,相關(guān)配置在conf/namespace.xml中。
啟動(dòng)后,可在控制臺(tái)看到內(nèi)部調(diào)用結(jié)果
Maven dependency
cn.ymotel
dactor
1.1.2
Gradle dependency
compile group: 'cn.ymotel', name: 'dactor', version:'1.1.2'
代碼簡單講解
執(zhí)行過程為chain->grandfather->parent->Selft。
依次調(diào)用執(zhí)行責(zé)任鏈中邏輯,grandfather中的邏輯,parent的邏輯和自身邏輯。
chain,grandfather,parent都可為空,不設(shè)置
在grandfather和parent中的Steps中至少有一個(gè)為placeholderActor交易,以調(diào)用子邏輯
整個(gè)過程中,需要先設(shè)置全局占位符
交易中如果未填寫beginBeanId或者endBeanId時(shí),系統(tǒng)默認(rèn)使用全局中配置的beginBeanId或者endBeanId
condtion可為空,空字符串,或者是ognl表達(dá)式
placeholderActor的作用是在暫存當(dāng)前環(huán)境,并調(diào)用子交易,待子交易執(zhí)行完畢后,再恢復(fù)當(dāng)前環(huán)境繼續(xù)執(zhí)行
如果在Step中未找到toBeanIdActor,會(huì)直接調(diào)用endBeanId方法,認(rèn)為自身交易已執(zhí)行結(jié)束。
交易的請求和流轉(zhuǎn)信息都保存在Message中
如果指定handleException=false或者使用默認(rèn)設(shè)置,直接返回父中執(zhí)行,如果父中也未捕獲,則繼續(xù)返回上一級(jí)執(zhí)行,
一般來說至少有要有一個(gè)actor中指定handleException=true
啟動(dòng)框架接收和執(zhí)行請求
同步異步寫法
同步寫法
public class BeginActor implements Actor {
/* (non-Javadoc)
* @see com.ymotel.util.actor.Actor#HandleMessage(com.ymotel.util.actor.Message)
*/
@Override
public Object HandleMessage(Message message) throws Exception {
return message;
}
}
繼承Actor接口,在HandlerMessage返回message方法,框架判斷返回不為空的情況下,直接將結(jié)果丟給框架進(jìn)行下一步出來
異步寫法
以HttpClientActor為例,在HandlerMessage中最后返回null,框架收到請求后等待下一步處理,釋放線程,在HttpClient中的CallBack中調(diào)用
message.getControlMessage().getMessageDispatcher().sendMessage(message);框架收到后進(jìn)行下一步處理。
public Object HandleMessage(final Message message) throws Exception {
Map context = message.getContext();
//
HttpUriRequest request = getHttpBuild(message.getContext()).build();
if (referer != null) {
request.addHeader("Referer", referer);
}
/**
* 通過此處可共用會(huì)話,進(jìn)行類似登錄后交易
*/
HttpClientContext tmplocalContext = null;
if (context.containsKey(HTTPCLIENT_CONTEXT)) {
tmplocalContext = (HttpClientContext) context.get(HTTPCLIENT_CONTEXT);
} else {
tmplocalContext = HttpClientContext.create();
CookieStore cookieStore = new BasicCookieStore();
tmplocalContext.setCookieStore(cookieStore);
context.put(HTTPCLIENT_CONTEXT, tmplocalContext);
}
final HttpClientContext localContext = tmplocalContext;
// CookieStore cookieStore = new BasicCookieStore();
// localContext.setCookieStore(cookieStore);
if (logger.isInfoEnabled()) {
logger.info("HandleMessage(Message) - httpclient----" + request); //$NON-NLS-1$
}
// final HttpGet httpget = new HttpGet(uri);
final String tmpcontent = content;
final String tmpcharset = charset != null ? charset : (String) context.get(CHARSET);
httpClientHelper.getHttpclient().execute(request, localContext, new FutureCallback() {
public void completed(final HttpResponse response) {
try {
/**
* 完成后及時(shí)清除
*/
message.getContext().remove(tmpcontent);
actorHttpClientResponse.handleResponse(response, localContext, tmpcharset, message);
//String responseString=HandleResponse((String)message.getContext().get(CHARSET),response);
//message.getContext().put(RESPONSE, responseString);
} catch (Exception e) {
// TODO Auto-generated catch block
if (logger.isErrorEnabled()) {
logger.error("$FutureCallback.completed(HttpResponse)", e); //$NON-NLS-1$
}
message.setException(e);
message.getControlMessage().getMessageDispatcher().sendMessage(message);
}
// System.out.println(httpget.getRequestLine() + "->" + response.getStatusLine());
}
public void failed(final Exception ex) {
if (logger.isErrorEnabled()) {
logger.error("$FutureCallback.failed(Exception)", ex); //$NON-NLS-1$
}
message.setException(ex);
message.getControlMessage().getMessageDispatcher().sendMessage(message);
// System.out.println(httpget.getRequestLine() + "->" + ex);
}
public void cancelled() {
Exception exception = new Exception("已取消");
message.setException(exception);
message.getControlMessage().getMessageDispatcher().sendMessage(message);
// System.out.println(httpget.getRequestLine() + " cancelled");
}
});
return null;
}
配置和API說明
配置說明
通過在xml中的Step實(shí)現(xiàn)內(nèi)部Actor之間的流程跳轉(zhuǎn)
在配置文件中包含
Actor、chain、和global配置 。
程序整個(gè)執(zhí)行順序?yàn)楦鶕?jù)交易碼找到對應(yīng)的Actor,然后執(zhí)行按照chain->parent->selft的順序進(jìn)行執(zhí)行。
chain執(zhí)行到placeholder處,調(diào)用parent交易繼續(xù)執(zhí)行,在parent交易中執(zhí)行到placeholder交易后,調(diào)用selft自身交易繼續(xù)執(zhí)行。
自身交易執(zhí)行完畢,彈出parent的placeholder處交易繼續(xù)執(zhí)行.parent執(zhí)行完畢,彈出chain中代碼繼續(xù)執(zhí)行。
global配置如下
beginBeanId為默認(rèn)的開始Actor,value中的值是在Spring中對應(yīng)的beanName,程序初始化時(shí)將會(huì)取得此值,對未指定beginBeanId或者endBeanId的Actor初始化全局配置。
beginActor和endActor都需要繼承Actor接口。
actor配置如下
htmlstream:
屬性handleException如果不設(shè)置的話,遇到異常,程序?qū)?huì)認(rèn)為子類中已經(jīng)執(zhí)行完畢,跳到parent中PlaceHolder處執(zhí)行。設(shè)置為true,將不會(huì)直接跳轉(zhuǎn)到parent中,由子類進(jìn)行自我處理。
parent和chain為調(diào)用具體交易前需要調(diào)用的公共交易,由于大部分交易都有通用的前置交易和統(tǒng)一的后置交易。通過設(shè)置parent或者chain,可提高代碼復(fù)用度。
fromBeanId和toBeanId配置的是Actor或者實(shí)現(xiàn)Actor接口的beanId。
parent和chain中的ref都需要是Actor.
results中可定義返回的state和需要處理的viewActor
async標(biāo)記是否是旁路交易,默認(rèn)值為false,為true值時(shí),會(huì)將上下文內(nèi)容設(shè)置復(fù)制一份,重新生成一份Message,進(jìn)行執(zhí)行,不影響主流程。
chain配置
chain可直觀展現(xiàn)Actor調(diào)用順序.
在chain中可順序并列多個(gè)parent類。每個(gè)parent中的Step都需要有placeHolderActor,以調(diào)用子類。
依次執(zhí)行l(wèi)ist中的交易,再執(zhí)行自身交易。自身交易執(zhí)行完畢,再依次回溯責(zé)任鏈中的每個(gè)交易,直到無可用交易。
命名空間
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:actor="http://www.ymotel.cn/schema/dactor"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.ymotel.cn/schema/dactor http://www.ymotel.cn/schema/dactor.xsd" namespace="np">
重要類方法說明
cn.ymotel.dactor.core.MessageDispatcher是交易流轉(zhuǎn)的核心接口類
public void startMessage(Message message, ActorTransactionCfg actorcfg, boolean blocked) throws Exception
方法,用于開始整個(gè)流程,其中message需要在執(zhí)行前進(jìn)行構(gòu)造,actorcfg可通過spring的getBean方法得到為Actor對象,如下
通過getBean('randomTxt1')即可得到ActorTransactionCfg對象。
blocked為是否阻塞,一般在交易初次放入隊(duì)列是為false,表示如果隊(duì)列滿,則直接扔給客戶端進(jìn)行處理。為true則一般為內(nèi)部交易,必須提交給隊(duì)列進(jìn)行處理。
sendMessage方法內(nèi)部調(diào)用,用于將處理完畢的Message重新放入隊(duì)列,繼續(xù)下一步流程。
cn.ymotel.dactor.core.disruptor.MessageRingBufferDispatcher是MessageDispatcher的接口實(shí)現(xiàn)類。,在啟動(dòng)Spring是需要在配置中加上
MessageRingBufferDispatcher的strategy、bufferSize、threadNumber為三個(gè)可設(shè)置屬性.正常情況下使用默認(rèn)設(shè)置即可。
strategy默認(rèn)使用ringBuffer的BlockingWaitStrategy策略進(jìn)行調(diào)度,如果交易量比較大,可調(diào)整此策略。
bufferSize默認(rèn)使用1024。
threadNumber默認(rèn)使用CPU個(gè)數(shù)的線程數(shù)。
其他默認(rèn)Actor說明
cn.ymotel.dactor.message.Message.Actor,所有需要在執(zhí)行的交易都必須繼承此接口。
public Object HandleMessage(Message message) throws Exception;程序通過調(diào)用HandleMessage對象,如果返回的不是message對象或者為NULL,則認(rèn)為此交易是異步執(zhí)行,不再自行調(diào)度。由異步交易在收到請求后,自己調(diào)用將Message再此放入隊(duì)列中。
cn.ymotel.dactor.action.PlaceholderActor 交易為特殊交易,用來將當(dāng)前隊(duì)列暫存,并調(diào)用子交易。
cn.ymotel.dactor.action.BeginActor 為Actor中step的默認(rèn)開始交易。
cn.ymotel.dactor.action.EndActor 為Actor中step的默認(rèn)結(jié)束交易。
cn.ymotel.dactor.action.JsonViewResolverActor為需要返回Json的J2EE view
cn.ymotel.dactor.action.ViewResolveActor為需要返回J2EE view的統(tǒng)一處理Actor
cn.ymotel.dactor.action.httpclient.HttpClientActor 提供的異步調(diào)用httpClient的Actor
cn.ymotel.dactor.action.netty.aysnsocket.TcpClientActor 提供的異步調(diào)用netty的Actor
交易流程舉例說明
以上交易的交易流程圖如下
以上的完整例子都可在example中得到
總結(jié)
以上是生活随笔為你收集整理的java 协程框架_GitHub - yaozhang0105/dactor: Dactor是基于Java的轻量级同步异步统一处理框架,基于协程思想构建...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 重大疾病保险该不该买
- 下一篇: java+queue+se_「013期」