基于Netty的分布式聊天系统
基于Netty的分布式聊天系統
Gitee地址:https://gitee.com/yuyuuyuy/micro-mall
文章目錄
- 基于Netty的分布式聊天系統
- 前言
- 一、IM系統架構的探討
- 二、效果展示
- 1.在線聊天
- 2.離線消息
- 3.消息漫游
- 4.sharding-sphere分庫分表
- 三、代碼實現
- 1.netty服務器
- 2.消息處理微服務
- 3.使用sharding-sphere分庫分表
- 總結
前言
IM 全稱是『Instant Messaging』,中文名是即時通訊。在這個高度信息化的移動互聯網時代,生活中 IM 類產品已經成為必備品,比較有名的如釘釘、微信、QQ 等以 IM 為核心功能的產品。本文探討IM系統的架構以及具體實現。使用到的技術有:gateway,nacos,netty,redis,rabbitmq,sharding-sphere,mybatis,mysql
一、IM系統架構的探討
最簡單的架構,就是客戶端直接連接Netty服務器,然后通過服務器來通訊。
然而,netty服務器是動態變化的,客戶端也不好確定到底連入哪一個服務器。因此,引入網關層和注冊中心,每個Netty服務器在注冊中心注冊,用戶請求Netty服務器時,先通過網關,網關經過負載均衡算法,從注冊中心的Netty服務器中選出一個給用戶使用,并將客戶端與Netty服務器的路由信息保存到redis。
以上架構只能支持用戶在線即時通訊,要是有一方離線,那離線用戶就收不到離線消息了,因此,考慮把離線消息存入reids集群,用戶上線后,先從redis中拉取離線消息。這種架構不支持消息漫游,因此,要把數據持久化,這里,考慮用mysql實現聊天消息的持久化。然而,高并發下,mysql很可能撐不住,從而出現消息的丟失問題。因此,引入消息隊列rabbitmq,確保消息可靠傳輸。另外,一般來說,聊天消息的數據量是巨大的,并發量也是巨大的,為提高數據庫讀寫效率和確保存儲巨大的數據量,這里用sharding-sphere分庫分表。完整的架構圖如下:
二、效果展示
1.在線聊天
在線聊天:
https://www.bilibili.com/video/BV1wR4y1K7KH/
在線單聊
2.離線消息
如果消息接收者不在線,則將離線消息保存到redis中,用戶上線后,從redis拉取離線消息
https://www.bilibili.com/video/BV1AR4y1K7tV/
離線消息
3.消息漫游
如果消息接收者長期不在線,則將保存在redis中的消息刪除,以免占用redis寶貴的資源。離線消息在redis中過期后,會自動生成一個標識,告訴該用戶在mysql中有未讀的消息,用戶上線后,從mysql中拉取離線消息,并可自主選擇拉取歷史消息。
https://www.bilibili.com/video/BV1AR4y1K7tV/
消息漫游
4.sharding-sphere分庫分表
通過分庫分表實現數據庫的橫向拓展,減輕單表單數據庫的讀寫壓力
https://www.bilibili.com/video/BV1DF411M7NX/?spm_id_from=autoNext
分庫分表
三、代碼實現
1.netty服務器
netty抽象出兩組線程池,BossGroup專門負責接收客戶端的連接,WorkerGroup專門負責網絡的讀寫
//設置BossGroup和WorkerGroupNioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();接下來基本上就是模板代碼,設置一下服務器的各種參數,比如channel,pipeline,handler等,handler用來監聽各種事件,netty基于事件驅動的特性就是通過handler來實現的。這里注意一下,我是把netty放到web容器中啟動的,啟動的時候需要手動把netty服務器注冊到nacos注冊中心。
public void run() throws InterruptedException {ServerBootstrap serverBootstrap = new ServerBootstrap();try {serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) {//獲取到pipelinechannel.pipeline().addLast(new HttpServerCodec());channel.pipeline().addLast(new ChunkedWriteHandler());channel.pipeline().addLast(new HttpObjectAggregator(8192));channel.pipeline().addLast(new WebSocketServerProtocolHandler("/chat"));//下面的WebSocketHandler是我自定義的handler//這個handler用來監聽channel上的各種事件//比如用戶上線,離線,發消息等channel.pipeline().addLast(new WebSocketHandler());}});System.out.println("================服務器啟動================");ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("================注冊到nacos================");NamingService namingService = nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());namingService.registerInstance(name, ip, port);channelFuture.channel().closeFuture().sync();} catch (NacosException ignored) {System.out.println("ip獲取失敗");} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}handler在netty中就相當于是springmvc中的controller層,下面就是自定義的WebSocketHandler監聽并處理各種事件的代碼
其中channelMap是一個靜態變量,用來保存用戶id到對應的channel的映射。
通過它,我們可以找到目標用戶在哪個channel,從而通過該channel把消息正確地發給目標用戶。其中ChannelHandlerContext類可以獲取各種上下文信息,比如用戶的ip地址,所在的channel,pipeline等等,還可以通過channel的attr方法設置自定義的屬性,比如給channel設置用戶id屬性,這樣拿到channel,就知道這個channel是屬于哪個用戶的了
下面是的代碼是我自定義的消息工具類MessageUtil,在netty中相當于springmvc中的service層,用來處理具體的業務邏輯。
//上線一個用戶public void online(Channel channel, String userId) throws UnknownHostException {//綁定channel和用戶的idAttributeKey<String> key = AttributeKey.valueOf("user");channel.attr(key).set(userId);String ip4 = String.valueOf(Inet4Address.getLocalHost());String ip = ip4.substring(ip4.lastIndexOf("/") + 1);//redis中設置user到服務器的路由redisTemplate.opsForValue().set("chat:route:" + userId, ip + ":" + port);//TODO 從redis中取 chat:message:expired:"+uid,判斷是否存在,存在說明該用戶有過期離線消息,去MYSQL中查找//TODO 并且,不要直接連MYSQL查找,不然會增加MYSQL的壓力,而且做不到異步boolean hasKey = Boolean.TRUE.equals(redisTemplate.hasKey("chat:message:expired:" + userId));if (hasKey) {//告訴rabbitmq需要從Mysql拉取離線消息的用戶id//message微服務監聽mq中需要拉取離線消息的用戶id,然后從mysql讀取該用戶的離線消息,然后發送回消息隊列,該服務器監聽消息隊列,如收到// 離線消息則把消息發送給該用戶rabbitTemplate.convertAndSend("message-pull-exchange", "message.users.pull.message", userId);}// 從redis中拉取離線消息Map<Object, Object> map = redisTemplate.opsForHash().entries("chat:message:" + userId);for (Object value : map.values()) {System.out.println(value);JSONObject chatMessage = JSONObject.parseObject((String) value);Long id = chatMessage.getLongValue("id");channel.writeAndFlush(new TextWebSocketFrame((String) value));//TODO 發送到消息隊列告訴mysql該消息已簽收System.out.println("發送到消息隊列,簽收消息" + id);rabbitTemplate.convertAndSend("message-signed-exchange", "message.signed.message", id);}//拉取完離線消息后從redis中刪除消息redisTemplate.delete("chat:message:" + userId);}public void sendMessage(ChatMessage chatMessage, Channel toChannel) {Long userId = chatMessage.getReceiver();String msg = chatMessage.getMessage();String msgId = String.valueOf(chatMessage.getId());String chatMessageJson = JSONObject.toJSONString(chatMessage);//判斷一下該用戶是否連接的是本服務器if (toChannel != null) {//連接的是本服務器就直接發送就可以了toChannel.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now() + ": " + chatMessageJson));//消息設置為已接受,然后存入數據庫chatMessage.setSigned(1);rabbitTemplate.convertAndSend("message-save-exchange", "message.save.message", chatMessage);} else {//判斷一下該用戶是否連接的是其他服務器,且在線boolean hasKey = Boolean.TRUE.equals(redisTemplate.hasKey("chat:route:" + userId));if (hasKey) {//目標用戶已上線且在其他服務器中,通過存在redis里的路由信息得知目標用戶所在的服務器,通過mq轉發到該服務器//發送到mq中,然后數據庫mysql消費消息,把所有的消息都持久化rabbitTemplate.convertAndSend("message-save-exchange", "message.save.message", chatMessage);//TODO 每臺服務器都創建并監聽自己的隊列,通過存在redis里的路由信息得知目標用戶所在的服務器,發送到rabbitmq相應的隊列中,讓目標用戶所在的服務器接收并轉發消息} else {//目標用戶已離線// 發送到redis中,該用戶上線后自動拉取離線消息long currentTimeMillis = System.currentTimeMillis();HashMap<String, String> chatMessageHashMap = new HashMap<>();chatMessageHashMap.put(msgId, chatMessageJson);redisTemplate.opsForHash().putAll("chat:message:" + userId, chatMessageHashMap);//給離線消息設置過期時間,10天后自動過期,用戶想再得到離線消息就要從MYSQL中取redisTemplate.expire("chat:message:" + userId, 2, TimeUnit.MINUTES);//發送到mq中,然后數據庫mysql消費消息,把所有的消息都持久化rabbitTemplate.convertAndSend("message-save-exchange", "message.save.message", chatMessage);}}}public void remove(String userId) {//redis中刪除該用戶的路由redisTemplate.delete("chat:route:" + userId);}以上就是netty服務器的具體實現
2.消息處理微服務
該模塊用來對用戶的聊天消息進行各種處理,比如消息簽收,消息拉取,消息持久化等。通過消息隊列收發各種消息,并開啟手動確認,確保消息不丟失,實現了對其他微服務的異步解耦。
把消息保存到mysql
//聊天消息持久化到MYSQL@RabbitHandlerpublic void listener(ChatMessage chatMessage, Channel channel, Message message) {System.out.println("mysql收到消息:" + chatMessage);//TODO 存入mysql數據庫try {messageService.save(chatMessage);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {//TODO 把錯誤寫入日志System.out.println("消息:" + chatMessage + "寫入數據庫失敗");System.out.println(e.getMessage());}}簽收消息
//確認消息已簽收@RabbitHandlerpublic void signMessage(Long id, Channel channel, Message message) {System.out.println("mysql中消息:" + id + "已確認簽收");//TODO 存入mysql數據庫try {messageService.signMessage(id);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {//TODO 把錯誤寫入日志System.out.println("消息:" + id + "確認失敗");System.out.println(e.getMessage());}}從mysql中拉取離線消息并發送回消息隊列,等待netty服務器簽收消息
//從mysql中拉取離線消息@RabbitHandlerpublic void getUsersPullMessage(String id, Channel channel, Message message) {System.out.println("用戶:" + id + "需要從mysql中拉取離線消息");Long uid = Long.valueOf(id);//TODO 存入mysql數據庫try {//從mysql中拉取離線消息并發送回消息隊列messageService.pullAndSendMessage(uid);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {//TODO 把錯誤寫入日志System.out.println("消息:" + id + "確認失敗");System.out.println(e.getMessage());}}3.使用sharding-sphere分庫分表
使用sharding-sphere分庫分表最核心的就是確定分庫分表策略,并配置好sharding-sphere,接下來就可以像訪問一個數據庫一張表一樣來訪問多個數據庫多個表的數據。這里簡單地通過消息id和用戶id來分庫分表
以下是我的sharding-sphere配置
schemaName: sharding_chat # dataSources:ds_0:url: jdbc:mysql://127.0.0.1:3306/chat?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=falseusername: rootpassword: adminconnectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1ds_1:url: jdbc:mysql://192.168.231.136:3306/chat?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=falseusername: rootpassword: adminconnectionTimeoutMilliseconds: 30000idleTimeoutMilliseconds: 60000maxLifetimeMilliseconds: 1800000maxPoolSize: 50minPoolSize: 1 # rules: - !SHARDINGtables:chat_message:actualDataNodes: ds_${0..1}.chat_message_${0..1}tableStrategy:standard:shardingColumn: idshardingAlgorithmName: chat_message_inlinekeyGenerateStrategy:column: idkeyGeneratorName: snowflakedefaultDatabaseStrategy:standard:shardingColumn: sendershardingAlgorithmName: chat_database_inlineshardingAlgorithms:chat_database_inline:type: INLINEprops:algorithm-expression: ds_${sender % 2}chat_message_inline:type: INLINEprops:algorithm-expression: chat_message_${id % 2}總結
以上就是高并發,高性能,高可用,并支持橫向拓展的基于Netty聊天系統的實現方案。核心是要理解netty的底層原理,并通過redis減少對mysql數據庫的訪問,通過注冊中心和網關實現Netty服務器的橫向拓展,使用消息隊列并開啟手動確認確保消息不丟失,并通過sharding-sphere分庫分表,進一步減輕數據庫的讀寫壓力
總結
以上是生活随笔為你收集整理的基于Netty的分布式聊天系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++基础课—郑莉9-
- 下一篇: 1000多个摄影师后期照片编辑调色Lig