本人对于netty框架的一些理解,怎么与网站上的websock建立连接
在Netty的里面有一個Boss,他開了一家公司(開啟一個服務(wù)端口)對外提供業(yè)務(wù)服務(wù),它手下有一群做事情的workers。Boss一直對外宣傳自己公司提供的業(yè)務(wù),并且接受(accept)有需要的客戶(client),當(dāng)一位客戶找到Boss說需要他公司提供的業(yè)務(wù),Boss便會為這位客戶安排一個worker,這個worker全程為這位客戶服務(wù)(read/write)。如果公司業(yè)務(wù)繁忙,一個worker可能會為多個客戶進(jìn)行服務(wù)。這就是Netty里面Boss和worker之間的關(guān)系。下面看看Netty是如何讓Boss和Worker進(jìn)行協(xié)助的。
private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap().group(boss, work).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(nettyPort))//保持長連接.childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new HeartbeatInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) {log.info("啟動 Netty 成功");}上訴代碼初始化了一條netty的服務(wù),那么如何初始話的寫在類HeartbeatInitializer里面
public class HeartbeatInitializer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline()//五秒沒有收到消息 將IdleStateHandler 添加到 ChannelPipeline 攔截器中.addLast(new IdleStateHandler(5, 0, 0))// HttpServerCodec:將請求和應(yīng)答消息解碼為HTTP消息.addLast("http-codec",new HttpServerCodec())// HttpObjectAggregator:將HTTP消息的多個部分合成一條完整的HTTP消息.addLast("aggregator",new HttpObjectAggregator(65536))// ChunkedWriteHandler:向客戶端發(fā)送HTML5文件.addLast("http-chunked",new ChunkedWriteHandler()).addLast(new HeartBeatSimpleHandle());} }上述文件設(shè)置了 攔截器,解碼和解碼合并,還有響應(yīng),最后一個new HeartBeatSimpleHandle()用來處理請求
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<Object> {private WebSocketServerHandshaker handShaker;/*** 取消綁定* @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {NettySocketHolder.remove((NioSocketChannel) ctx.channel());}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {super.userEventTriggered(ctx, evt);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// 傳統(tǒng)的HTTP接入if (msg instanceof FullHttpRequest) {FullHttpRequest req = (FullHttpRequest) msg;handleHttpRequest(ctx, req);//獲取url后置參數(shù)String uri=req.uri();QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);Map<String, List<String>> parameters = queryStringDecoder.parameters();Integer userId = Integer.valueOf(parameters.get("userId").get(0));// 存儲當(dāng)前登錄ctxif(NettySocketHolder.get((long)userId) == null){NettySocketHolder.put((long)userId, (NioSocketChannel) ctx.channel());}// WebSocket接入} else if (msg instanceof WebSocketFrame) {if ("live".equals(ctx.channel().attr(AttributeKey.valueOf("type")).get())) {handlerWebSocketFrame(ctx, (WebSocketFrame) msg);}log.info("收到msg={},id={}", msg);//保存客戶端與 Channel 之間的關(guān)系 }}private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {// 如果HTTP解碼失敗,返回HTTP異常if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}//獲取url后置參數(shù)HttpMethod method=req.method();String uri=req.uri();QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);Map<String, List<String>> parameters = queryStringDecoder.parameters();if(method==HttpMethod.GET&&"/websocket".equals(uri)){//...處理ctx.channel().attr(AttributeKey.valueOf("type")).set("live");}// 構(gòu)造握手響應(yīng)返回,本機測試WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://"+req.headers().get(HttpHeaderNames.HOST)+uri, null, false);handShaker = wsFactory.newHandshaker(req);if (handShaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {handShaker.handshake(ctx.channel(), req);}}private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {// 返回應(yīng)答給客戶端if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);res.content().writeBytes(buf);buf.release();}// 如果是非Keep-Alive,關(guān)閉連接ChannelFuture f = ctx.channel().writeAndFlush(res);if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {f.addListener(ChannelFutureListener.CLOSE);}}private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {// 判斷是否關(guān)閉鏈路的指令if (frame instanceof CloseWebSocketFrame) {handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());}}HeartBeatSimpleHandle 用與處理管道(Channel)的讀取工作,將管道儲存在NettySocketHolder里面,用的時候取出。WebSocketServerHandshaker用于響應(yīng)客戶端的響應(yīng)
var websocket;$(function(){$.ajax({type: "POST",url: "/login",data: {userId: 2,account: "135541",userName: "123"},contentType: "application/x-www-form-urlencoded; charset=utf-8",dataType: "json",success: function (result) {websoketCannel(result.data.userId, result.data.account, result.data.userName);}});});function websoketCannel(userId, account, userName){//如果瀏覽器支持WebSocketif(window.WebSocket){websocket = new WebSocket("ws://localhost:1212/websocket?userId="+ userId +"&account="+ account +"&userName="+ userName +""); //獲得WebSocket對象//當(dāng)有消息過來的時候觸發(fā)websocket.onmessage = function(event){var data = JSON.parse(event.data);$("#message").text(data.msg);};//連接關(guān)閉的時候觸發(fā)websocket.onclose = function(event){console.log("斷開連接");};//連接打開的時候觸發(fā)websocket.onopen = function(event){console.log("建立連接");}}else{alert("瀏覽器不支持WebSocket");}}function sendMsg(msg) { //發(fā)送消息if(window.WebSocket){if(websocket.readyState == WebSocket.OPEN) { //如果WebSocket是打開狀態(tài)websocket.send(msg); //send()發(fā)送消息 }}else{return;}}上面代碼是客戶端如何連netty
?
轉(zhuǎn)載于:https://www.cnblogs.com/kangniuniu/p/11107768.html
總結(jié)
以上是生活随笔為你收集整理的本人对于netty框架的一些理解,怎么与网站上的websock建立连接的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux性能监控命令——sar
- 下一篇: 20050708:我还是忍忍吧