mysql 透明代理_透明代理MySQL_基于zbus的MySQL透明代理(100行)-云栖社区
我們上次講到zbus網(wǎng)絡(luò)通訊的核心API:
Dispatcher -- 負(fù)責(zé)-NIO網(wǎng)絡(luò)事件Selector引擎的管理,對(duì)Selector引擎負(fù)載均衡
IoAdaptor -- 網(wǎng)絡(luò)事件的處理,服務(wù)器與客戶端共用,負(fù)責(zé)讀寫,消息分包組包等
Session -- 代表網(wǎng)絡(luò)鏈接,可以讀寫消息
實(shí)際的應(yīng)用,我們幾乎只需要做IoAdaptor的個(gè)性化實(shí)現(xiàn)就能完成高效的網(wǎng)絡(luò)通訊服務(wù),今天我們將舉例說明如何個(gè)性化這個(gè)IoAdaptor。
我們今天要完成的目標(biāo)是:實(shí)現(xiàn)MySQL服務(wù)器的透明代理。效果是,你訪問代理服務(wù)器跟訪問目標(biāo)MySQL無差異。
我們?cè)跍y(cè)試環(huán)境10.17.2.30:3306 這臺(tái)機(jī)器上提供了MySql,在我們本地機(jī)器上跑起來我們今天基于zbus.NET實(shí)現(xiàn)的一個(gè)代理程序,就能達(dá)到下面的效果。
完成大概不到100 行的代碼, Cool?Let’s roll!
首先,我們思考透明TCP代理到底在干啥,透明的TCP代理的業(yè)務(wù)邏輯其實(shí)非常簡(jiǎn)單,可以描述為,將來自代理上游(發(fā)起請(qǐng)求到代理)的數(shù)據(jù)轉(zhuǎn)發(fā)到目標(biāo)TCP服務(wù)器,把目標(biāo)服務(wù)器回來的數(shù)據(jù)原路返回代理上游客戶端。 注意這個(gè)原路,如何做到原路返回成為關(guān)鍵點(diǎn)。這個(gè)示例其實(shí)跟MySQL沒有任何關(guān)系,原則上任何TCP層面的服務(wù)都應(yīng)該適配。
基于zbus.NET怎么來將上面的邏輯在體現(xiàn)出來,也就是如何個(gè)性化IoAdaptor?直觀的講,我們要處理的幾個(gè)事件應(yīng)該包括:1)從上游客戶端發(fā)起的鏈接請(qǐng)求--代理服務(wù)器的Accept事件,2)代理服務(wù)器連接目標(biāo)服務(wù)器的Connect事件,3)上下游的數(shù)據(jù)事件onMessage。
zbus.NET的IoAdaptor提供的個(gè)性化事件如下
基本包括一個(gè)鏈接(客戶端或者服務(wù)端)的生命周期,與消息的編解碼。
我們的代理IoAdaptor就是逐一個(gè)性化處理。
第一步,編解碼: 透明代理對(duì)消息內(nèi)容不做理解,所以不需要編解碼。
// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)
public IoBuffer encode(Object msg) {
if (msg instanceof IoBuffer) {
IoBuffer buff = (IoBuffer) msg;
return buff;
} else {
throw new RuntimeException("Message Not Support");
}
}
// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)
public Object decode(IoBuffer buff) {
if (buff.remaining() > 0) {
byte[] data = new byte[buff.remaining()];
buff.readBytes(data);
return IoBuffer.wrap(data);
} else {
return null;
}
}
第二步,代理服務(wù)接入:
@Override
protected void onSessionAccepted(Session sess) throws IOException {
Session target = null;
Dispatcher dispatcher = sess.getDispatcher();
try {
target = dispatcher.createClientSession(targetAddress, this);
} catch (Exception e) {
sess.asyncClose();
return;
}
sess.chain = target;
target.chain = sess;
dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
}
這里的邏輯思路是,代理服務(wù)器每接受到一個(gè)請(qǐng)求--通過onSessionAccepted表達(dá),我們將同時(shí)創(chuàng)建一個(gè)到目標(biāo)服務(wù)器的鏈接,今天的例子是目標(biāo)MySQL服務(wù)器,注意上面的處理中把創(chuàng)建目標(biāo)服務(wù)器Session過程與真正鏈接到目標(biāo)服務(wù)分開(Dispatcher也提供合并二者的工具方法),是為了能在沒有發(fā)生鏈接之前綁定上好上下游關(guān)系,通過Session的chain變量來表達(dá),也就是當(dāng)前Session的關(guān)聯(lián)Session,關(guān)聯(lián)好之后啟動(dòng)感興趣Connect事件,邏輯處理完畢。
第三步,鏈接成功事件(第二步中需要鏈接到目標(biāo)服務(wù)器)
@Override
public void onSessionConnected(Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
if(sess.isActive() && chain.isActive()){
sess.register(SelectionKey.OP_READ);
chain.register(SelectionKey.OP_READ);
}
}
這里的一個(gè)核心是當(dāng)上下游都處于鏈接正常態(tài),上下游Session都啟動(dòng)感興趣消息讀事件(寫事件是在讀取處理中自動(dòng)觸發(fā)),為什么在這里做的原因是一定要等上下游都正常態(tài)后才啟動(dòng)雙方消息處理,不然會(huì)出現(xiàn)字節(jié)丟失。
第四步,處理上下游數(shù)據(jù)事件
@Override
protected void onMessage(Object msg, Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
chain.write(msg);
}
是不是非常簡(jiǎn)單,類似pipeline,從一端的數(shù)據(jù)寫到另外一端。
原則上面4步結(jié)束,整個(gè)透明代理就完成了,但是為了處理鏈接異常清理,我們?cè)黾恿薙ession清理處理,如下
@Override
public void onSessionToDestroy(Session sess) throws IOException {
try {
sess.close();
} catch (IOException e) { //ignore
}
if (sess.chain == null) return;
try {
sess.chain.close();
sess.chain.chain = null;
sess.chain = null;
} catch (IOException e) {
}
}
工作就是解決上下游鏈接清理鏈接。
至此為止我們的IoAdaptor個(gè)性化就完成了,是不是非常簡(jiǎn)單,現(xiàn)在我們要跑起來測(cè)試了,下面的代碼就是上一次講到重復(fù)的設(shè)置,沒有新意。
public static void main(String[] args) throws Exception {
Dispatcher dispatcher = new Dispatcher();
IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");
final Server server = new Server(dispatcher, ioAdaptor, 3306);
server.start();
}
騷年,包括渣渣import和少許注釋加起來折騰了不到100行,該跑一跑了,還是那句話,不是HelloWorld,你可以規(guī)模壓力測(cè)。看看你是否在本地代理出來了你的目標(biāo)服務(wù)MySQL,gl,hf, gogogo.
完整代碼可運(yùn)行代碼如下,也可直接到zbus示例代碼庫中找到
package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;
public class TcpProxyAdaptor extends IoAdaptor {
private String targetAddress;
public TcpProxyAdaptor(String targetAddress) {
this.targetAddress = targetAddress;
}
// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)
public IoBuffer encode(Object msg) {
if (msg instanceof IoBuffer) {
IoBuffer buff = (IoBuffer) msg;
return buff;
} else {
throw new RuntimeException("Message Not Support");
}
}
// 透?jìng)鞑恍枰幗獯a,簡(jiǎn)單返回ByteBuffer數(shù)據(jù)
public Object decode(IoBuffer buff) {
if (buff.remaining() > 0) {
byte[] data = new byte[buff.remaining()];
buff.readBytes(data);
return IoBuffer.wrap(data);
} else {
return null;
}
}
@Override
protected void onSessionAccepted(Session sess) throws IOException {
Session target = null;
Dispatcher dispatcher = sess.getDispatcher();
try {
target = dispatcher.createClientSession(targetAddress, this);
} catch (Exception e) {
sess.asyncClose();
return;
}
sess.chain = target;
target.chain = sess;
dispatcher.registerSession(SelectionKey.OP_CONNECT, target);
}
@Override
public void onSessionConnected(Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
if(sess.isActive() && chain.isActive()){
sess.register(SelectionKey.OP_READ);
chain.register(SelectionKey.OP_READ);
}
}
@Override
protected void onMessage(Object msg, Session sess) throws IOException {
Session chain = sess.chain;
if(chain == null){
sess.asyncClose();
return;
}
chain.write(msg);
}
@Override
public void onSessionToDestroy(Session sess) throws IOException {
try {
sess.close();
} catch (IOException e) { //ignore
}
if (sess.chain == null) return;
try {
sess.chain.close();
sess.chain.chain = null;
sess.chain = null;
} catch (IOException e) {
}
}
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
Dispatcher dispatcher = new Dispatcher();
IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306");
final Server server = new Server(dispatcher, ioAdaptor, 3306);
server.setServerName("TcpProxyServer");
server.start();
}
}
總結(jié)
以上是生活随笔為你收集整理的mysql 透明代理_透明代理MySQL_基于zbus的MySQL透明代理(100行)-云栖社区的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux解压mysql文件命令行_li
- 下一篇: mysql安装教程8.0.21安装_my