关于WebSocket分布式实现的一种方案
生活随笔
收集整理的這篇文章主要介紹了
关于WebSocket分布式实现的一种方案
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
WebSocket常用于做后臺消息推送,也可以做簡易的IM聊天,由于WebSocket中的Session沒有實(shí)現(xiàn)序列化接口的,我們無法將session序列化實(shí)現(xiàn)分布式部署,今天就來記錄一種分布式的實(shí)現(xiàn)方案。
實(shí)現(xiàn)原理
首先我們講的這種方式是利用redis訂閱和發(fā)布模式來實(shí)現(xiàn),大致過程:
- 每個服務(wù)器記錄連接,保存在內(nèi)存當(dāng)中
- 當(dāng)需要推送websocket消息的時候,同時在redis發(fā)布一個消息
- 每個服務(wù)器訂閱redis的消息,當(dāng)監(jiān)聽到有消息時,每臺服務(wù)器遍歷自己內(nèi)存當(dāng)中的連接進(jìn)行發(fā)送
這樣我們就可以實(shí)現(xiàn)websocket的分布式部署,當(dāng)然redis訂閱和發(fā)布也可以用其他消息隊(duì)列工具類實(shí)現(xiàn)。
實(shí)現(xiàn)過程
這里并不打算貼所有代碼,只記錄關(guān)鍵的一些代碼,其余的可以網(wǎng)上查看相關(guān)資料,基于SpringBoot2.1.8實(shí)現(xiàn)。
pom文件引入redis和websocket
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--websocket--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dependency>寫一個redis發(fā)布器
@Component public class PublishService {@AutowiredStringRedisTemplate redisTemplate;/*** 發(fā)布方法** @param channel 消息發(fā)布訂閱 主題* @param message 消息信息*/public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);} }寫一個redis監(jiān)聽器
public class SubscribeListener implements MessageListener {/*** 訂閱接收發(fā)布者的消息*/@Overridepublic void onMessage(Message message, byte[] pattern) {String msg = new String(message.getBody());System.out.println(new String(pattern) + "接收消息:" + msg);//遍歷本地內(nèi)存當(dāng)中的websocket連接...//拿到對應(yīng)的websocket session就可以進(jìn)行推送消息}}配置下websocket
@Configuration public class WebSocketConfig extends ServerEndpointConfig.Configurator{@Overridepublic void modifyHandshake(ServerEndpointConfig sec,HandshakeRequest request, HandshakeResponse response) {// 主要為了能在websocket打開連接時獲取httpsession和當(dāng)前登陸用戶,此處跟本文內(nèi)容沒有關(guān)系HttpSession httpSession=(HttpSession) request.getHttpSession();//存入httpsessionsec.getUserProperties().put(HttpSession.class.getName(),httpSession);//存入當(dāng)前用戶sec.getUserProperties().put("user", ShiroUtils.getCurrentUser());super.modifyHandshake(sec, request, response);}/*** 自動注冊使用了@ServerEndpoint注解聲明的Websocket endpoint* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();} }配置redis消息發(fā)布訂閱
/*** redis 消息監(jiān)聽 用于websocket 分布式處理* @param redisConnectionFactory* @return*/ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);//設(shè)置訂閱topicredisMessageListenerContainer.addMessageListener(new SubscribeListener(), new ChannelTopic("socket_topic"));return redisMessageListenerContainer; }再寫一個簡易的存儲工具類,這個就基于ConcurrentHashMap就能實(shí)現(xiàn),不記錄了。
最后來看websocket ServerEndpoint 方法
@OnOpen public void onOpen(Session session, EndpointConfig config) {log.debug("websocket:打開連接");//將連接存入我們的緩存工具,這個工具就是簡單的存儲,代碼自己寫一個CacheSessionMap.put("可以是sessionId,也可以實(shí)當(dāng)前用戶ID",session);}@OnClose public void onClose(Session session) {log.debug("websocket:關(guān)閉連接");//關(guān)閉的連接,我們將其移除CacheSessionMap.remove("可以是sessionId,也可以實(shí)當(dāng)前用戶ID");}@OnMessage public void onMessage(Session session, String message) throws IOException {log.debug("websocket:消息來了");//用我們之前寫的redis消息發(fā)布器,將這個消息發(fā)布到redispublishService.publish("socket_topic", message); }總結(jié)
以上是生活随笔為你收集整理的关于WebSocket分布式实现的一种方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring boot整合redis实现
- 下一篇: Netty与Spring WebSock