2019獨角獸企業重金招聘Python工程師標準>>>
今天心情很不好!!! 原因保密。
這篇是基于"netty與websocket通信demo"。
錯誤想法:大量客戶請求,共用一個worker,來實現推送。
正確作法:應該是對Channel對應的ChannelGroup進行操作,來實現推送。
一個Channel可以劃分到多個ChannelGroup中。
PushServerChannelHandler和DynMessage這兩個類最重要,其實類基本沒變。
package?org.sl.demo.chatserver;import?java.util.List;
import?java.util.Map;import?org.jboss.netty.buffer.ChannelBuffers;
import?org.jboss.netty.channel.Channel;
import?org.jboss.netty.channel.ChannelHandlerContext;
import?org.jboss.netty.channel.ChannelStateEvent;
import?org.jboss.netty.channel.ExceptionEvent;
import?org.jboss.netty.channel.MessageEvent;
import?org.jboss.netty.channel.SimpleChannelHandler;
import?org.jboss.netty.channel.group.ChannelGroup;
import?org.jboss.netty.channel.group.DefaultChannelGroup;
import?org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import?org.jboss.netty.handler.codec.http.HttpHeaders;
import?org.jboss.netty.handler.codec.http.HttpMethod;
import?org.jboss.netty.handler.codec.http.HttpRequest;
import?org.jboss.netty.handler.codec.http.HttpResponseStatus;
import?org.jboss.netty.handler.codec.http.HttpVersion;
import?org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import?org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import?org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import?org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import?org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import?org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import?org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;public?class?PushServerChannelHandler?extends?SimpleChannelHandler?{static?boolean?debug?=?true;@Overridepublic?void?channelOpen(ChannelHandlerContext?ctx,?ChannelStateEvent?e){if(debug){System.out.println("channelOpen");}DynMessage.addAudience(e.getChannel());}@Overridepublic?void?messageReceived(ChannelHandlerContext?ctx,?MessageEvent?e)?throws?Exception{Channel?ch?=?e.getChannel();Object?msg?=?e.getMessage();if(debug){System.out.println("---------------");System.out.println("message:?"+msg.getClass());}try{if(msg?instanceof?HttpRequest){processHttpRequest(ch,?(HttpRequest)msg);}else?if(msg?instanceof?WebSocketFrame){processWebsocketRequest(ch,(WebSocketFrame)msg);}else{//未處理的請求類型}}catch(Exception?ex){ch.close().sync();}super.messageReceived(ctx,?e);}@Overridepublic?void?channelClosed(ChannelHandlerContext?ctx,?ChannelStateEvent?e){if(debug){System.out.println("channelClosed");}if(e?instanceof?MessageEvent){MessageEvent?me?=?(MessageEvent)?e; }DynMessage.removeAudience(e.getChannel());e.getChannel().close();}@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?ExceptionEvent?e){if(debug){System.out.println("channelClosed");}DynMessage.removeAudience(e.getChannel());e.getCause().printStackTrace();e.getChannel().close();try?{super.exceptionCaught(ctx,?e);}?catch?(Exception?e1)?{ e1.printStackTrace();}}void?processHttpRequest(Channel?channel,HttpRequest?request){HttpHeaders?headers?=?request.headers();if(debug){List<Map.Entry<String,String>>?ls?=?headers.entries();for(Map.Entry<String,String>?i:?ls){System.out.println("header??"+i.getKey()+":"+i.getValue());}} //non-get?requestif(!HttpMethod.GET.equals(request.getMethod())){DefaultHttpResponse?resp?=?new?DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST);channel.write(resp); channel.close();return;}WebSocketServerHandshakerFactory?wsShakerFactory?=?new?WebSocketServerHandshakerFactory("ws://"+request.headers().get(HttpHeaders.Names.HOST),null,false?);WebSocketServerHandshaker?wsShakerHandler?=?wsShakerFactory.newHandshaker(request);if(null==wsShakerHandler){//無法處理的websocket版本wsShakerFactory.sendUnsupportedWebSocketVersionResponse(channel);}else{//向客戶端發送websocket握手,完成握手//客戶端收到的狀態是101?sitching?protocolwsShakerHandler.handshake(channel,?request);} }void?processWebsocketRequest(Channel?channel,?WebSocketFrame?request)?throws?Exception{ if(request?instanceof?CloseWebSocketFrame){DynMessage.removeAudience(channel);channel.close().sync();}else?if(request?instanceof?PingWebSocketFrame){ channel.write(new?PongWebSocketFrame(request.getBinaryData()));??}else?if(request?instanceof?TextWebSocketFrame){//這個地方?可以根據需求,加上一些業務邏輯TextWebSocketFrame?txtReq?=?(TextWebSocketFrame)?request; if(debug){?System.out.println("txtReq:"+txtReq.getText());}if("disconnect".equalsIgnoreCase(txtReq.getText())){DynMessage.removeAudience(channel);channel.close().sync();return;}//把符合條件的channel添加到DynMessage的channelGroup中DynMessage.addAudience(channel);}else{//WebSocketFrame還有一些}}
}
package?org.sl.demo.chatserver;import?java.util.Random;import?org.jboss.netty.buffer.ChannelBuffers;
import?org.jboss.netty.channel.Channel;
import?org.jboss.netty.channel.group.ChannelGroup;
import?org.jboss.netty.channel.group.DefaultChannelGroup;
import?org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;/**
*動態產生消息,并向Channel組推送。
*/
public?class?DynMessage?implements?Runnable{public?static?ChannelGroup?audiences?=?new?DefaultChannelGroup("msg-group");static?public?void?addAudience(Channel?ch){ audiences.add(ch);}static?public?void?removeAudience(Channel?ch){audiences.remove(ch);}static?String[]?names?=?{"Tom",?"Jerry","Terry",?"Looney","Merrie",?"William","Joseph",?"Hanna","Speike",?"Tyke","Tuffy",?"Lightning",};static?String?message?=?"";public?static?String?getMessage(){StringBuffer?sb?=?new?StringBuffer();sb.append("hello,my?name?is?");sb.append(names[new?Random().nextInt(names.length)]);sb.append("."); return?sb.toString();
// return?message;}@Overridepublic?void?run()?{ System.out.println("DynMessage?start");for(;;){String?msg?=?getMessage(); radiate(msg);try{Thread.sleep(1000);?}catch(Exception?ex){}}}void?radiate(String?msg){audiences.write(new?TextWebSocketFrame(msg));}
}
<html>
<head>
<script?src="jquery-1.9.1.js"></script>
<script?src="messagepush.js"></script>
<script?>
function?doStop(){stopMsgPush();
}function?doWsStart(){var??r6?=?generateMixed(6);$("#txtReq").val(r6);var??params?=?$("#txtReq").val();doStop();wsMsgPush('127.0.0.1',params,function(data){$("#txtResp").val(data); },function(){$("#txtResp").val("ws?close...");}?,function(){$("#txtResp").val("ws?error...");}?);
}
</script>
</head><body><br/>
<br/><br/>
send:?<input?id="txtReq"?readonly="readonly"?type="text"?value=""?/>
<input?type="button"?value="start"?onclick="doWsStart()">
<input?type="button"?value="stop"?onclick="doStop()"/>?
<br/>recv:?<input?id="txtResp"?type="text"?value=""??size="50"/>
</body>
</html>
var?_mp_ws?=?null;
var?_mp_ajax_it?=?null;function?msgPush(url,?params,onmessage,onclose,onerror){wsMsgPush(url,params,onmessage,onclose,onerror);if(!_mp_ws){ajaxMsgPush(url,params,10000,onmessage,onclose,onerror);}
}function?old_wsMsgPush(url,?params,onmessage,onclose,onerror){ var?ws?=?new?WebSocket("ws://"+url);?ws.onopen?=?function(){ws.send('1111')};ws.onmessage?=?function(evt){?onmessage(evt.data);};
}function?wsMsgPush(url,?params,onmessage,onclose,onerror){ _mp_ws?=?new?WebSocket("ws://"+url);?if(!_mp_ws){?return;?}_mp_ws.onopen?=?function(){?_mp_ws.send(params);?};if(onmessage)?_mp_ws.onmessage?=?function(evt){?onmessage(evt.data);?}if(onerror)?_mp_ws.onerror?=?function?(evt){?onerror();?}if(onclose)?_mp_ws.onclose?=?function?(evt){?onclose();?}
}function?ajaxMsgPush(url,?params,interval,onmessage,onclose,onerror){ function?__getmsg(){$.ajax({url: url,data: params,cache: true,type: "get",dataType: "text", success: function(data,?textStatus,?jqXHR){?if(onmessage)?onmessage(data);},error: function(jqXHR,?textStatus,?errorThrown){if(onerror)?onerror();},complete: function(jqXHR,?textStatus){if(onclose)?onclose();}});} _mp_ajax_it?=?setInterval("__getmsg()",interval);
}function?stopMsgPush(){if(_mp_ws){_mp_ws.send("disconnect");_mp_ws.close();}if(_mp_ajax_it){clearInterval(_mp_ajax_it);}
}var?chars?=?['0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'];
function?generateMixed(n)?{var?res?=?"";for(var?i?=?0;?i?<?n?;?i?++)?{var?id?=?Math.ceil(Math.random()*35);res?+=?chars[id];}return?res;
}
package?org.sl.demo.chatserver;import?org.jboss.netty.channel.ChannelPipeline;
import?org.jboss.netty.channel.ChannelPipelineFactory;
import?org.jboss.netty.channel.Channels;
import?org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import?org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import?org.jboss.netty.handler.timeout.WriteTimeoutHandler;
import?org.jboss.netty.util.HashedWheelTimer;public?class?PushServerChannelPiplelineFactory??implements?ChannelPipelineFactory{@Overridepublic?ChannelPipeline?getPipeline()?throws?Exception?{ChannelPipeline?cp?=?Channels.pipeline();cp.addLast("decoder",?new?HttpRequestDecoder());cp.addLast("encoder",?new?HttpResponseEncoder());cp.addLast("writeTimeout",?new?WriteTimeoutHandler(new?HashedWheelTimer(),10));cp.addLast("handler",?new?PushServerChannelHandler());return?cp;}}
package?org.sl.demo.chatserver;import?java.net.InetSocketAddress;
import?java.util.concurrent.Executors;import?org.jboss.netty.bootstrap.ServerBootstrap;
import?org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;public?class?PushServer?implements?Runnable{int?port?=?80;public?PushServer(int?port){this.port?=?port;}@Overridepublic?void?run()?{System.out.println("ChatServer?"+port);ServerBootstrap?b?=?new?ServerBootstrap(new?NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));b.setOption("child.tcpNoDelay",?true);??b.setOption("child.keepAlive",?true);b.setPipelineFactory(new?PushServerChannelPiplelineFactory());b.bind(new?InetSocketAddress(port));}public?static?void?main(String[]?args){Thread?t?=?new?Thread(new?DynMessage(),"DynMessage");t.start();new?PushServer(80).run();}
}
轉載于:https://my.oschina.net/tangcoffee/blog/340246
總結
以上是生活随笔為你收集整理的netty websocket 简单消息推送demo的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。