bootstrap外不引用连接_网络编程Netty IoT百万长连接优化,万字长文精讲
IoT是什么
The Internet of things的簡稱IoT,即是物聯網的意思
IoT推送系統的設計
比如說,像一些智能設備,需要通過APP或者微信中的小程序等,給設備發送一條指令,讓這個設備下載或者播放音樂,那么需要做什么才可以完成上面的任務呢?
首先需要推送服務器,這個服務器主要負責消息的分發,不處理業務消息;設備會連接到推送服務器,APP通過把指令發送到推送服務器,然后推送服務器再把指令分發給相應的設備。
可是,當買設備的人越來越多,推送服務器所能承受的壓力就越大,這個時候就需要對推送服務器做集群,一臺不行,就搞十臺,那么還有一個問題,就是推送服務器增加了,設備如何找到相應的服務器,然后和服務器建立連接呢,注冊中心可以解決這個問題,每一臺服務器都注冊到注冊中心上,設備會請求注冊中心,得到推送服務器的地址,然后再和服務器建立連接。
而且還會有相應的redis集群,用來記錄設備訂閱的主題以及設備的信息;APP發送指令到設備,其實就是發送了一串數據,相應的會提供推送API,提供一些接口,通過接口把數據發送過去;而推送API不是直接去連接推送服務器的,中間還會有MQ集群,主要用來消息的存儲,推送API推送消息到MQ,推送服務器從MQ中訂閱消息,以上就是簡單的IoT推送系統的設計。
下面看下結構圖:
注意:設備連接到注冊中心的是短連接,設備和推送服務器建立的連接是長連接
心跳檢測機制
簡述心跳檢測
心跳檢測,就是判斷對方是否還存活,一般采用定時的發送一些簡單的包,如果在指定的時間段內沒有收到對方的回應,則判斷對方已經掛掉
Netty提供了IdleStateHandler類來實現心跳,簡單的使用如下:
pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS));下面是IdleStateHandler的構造函數:
public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);}四個參數說明:
心跳檢測機制代碼示例
簡單示例: 服務端:
static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PingServer().start(BEGIN_PORT, N_PORT); } public void start(int beginPort, int nPort) { System.out.println("啟動服務...."); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addFirst(new IdleStateHandler(0, 0, 1, TimeUnit.SECONDS)); pipeline.addLast(new PingHandler()); //每個連接都有個ConnectionCountHandler對連接記數進行增加 pipeline.addLast(new ConnectionCountHandler()); } }); bootstrap.bind(beginPort).addListener((ChannelFutureListener) future -> { System.out.println("端口綁定成功: " + beginPort); }); System.out.println("服務已啟動!");}public class PingHandler extends SimpleUserEventChannelHandler { private static final ByteBuf PING_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("ping".getBytes())); private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("pong".equals(str)) { System.out.println(ctx + " ---- " + str); count--; } ctx.fireChannelRead(msg); } @Override protected void eventReceived(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt.state() == ALL_IDLE) { if (count >= 3) { System.out.println("檢測到客戶端連接無響應,斷開連接:" + ctx.channel()); ctx.close(); return; } count++; System.out.println(ctx.channel() + " ---- ping"); ctx.writeAndFlush(PING_BUF.duplicate()); } ctx.fireUserEventTriggered(evt); }}客戶端:
//服務端的IP private static final String SERVER_HOST = "localhost"; static final int BEGIN_PORT = 8088; static final int N_PORT = 100; public static void main(String[] args) { new PoneClient().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new PongHandler()); } }); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); ChannelFuture channelFuture = bootstrap.connect(serverHost, beginPort); channelFuture.addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { System.out.println("連接失敗,退出!"); System.exit(0); } }); try { channelFuture.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }public class PongHandler extends SimpleChannelInboundHandler { private static final ByteBuf PONG_BUF = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer("pong".getBytes())); @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); String str = new String(data); if ("ping".equals(str)) { ctx.writeAndFlush(PONG_BUF.duplicate()); } }}服務端輸出結果:
百萬長連接優化
連接優化代碼示例
服務端:
static final int BEGIN_PORT = 11000; static final int N_PORT = 100; public static void main(String[] args) { new Server().start(BEGIN_PORT, N_PORT); } public void start(int beginPort, int nPort) { System.out.println("啟動服務...."); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //每個連接都有個ConnectionCountHandler對連接記數進行增加 pipeline.addLast(new ConnectionCountHandler()); } }); //這里開啟 10000到100099這100個端口 for (int i = 0; i < nPort; i++) { int port = beginPort + i; bootstrap.bind(port).addListener((ChannelFutureListener) future -> { System.out.println("端口綁定成功: " + port); }); } System.out.println("服務已啟動!"); }客戶端:
//服務端的IP private static final String SERVER_HOST = "192.168.231.129"; static final int BEGIN_PORT = 11000; static final int N_PORT = 100; public static void main(String[] args) { new Client().start(BEGIN_PORT, N_PORT); } public void start(final int beginPort, int nPort) { System.out.println("客戶端啟動...."); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); final Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_REUSEADDR, true); int index = 0; int port; String serverHost = System.getProperty("server.host", SERVER_HOST); //從10000的端口開始,按端口遞增的方式進行連接 while (!Thread.interrupted()) { port = beginPort + index; try { ChannelFuture channelFuture = bootstrap.connect(serverHost, port); channelFuture.addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { System.out.println("連接失敗,退出!"); System.exit(0); } }); channelFuture.get(); } catch (Exception e) { } if (++index == nPort) { index = 0; } } }ConnectionCountHandler類:
public class ConnectionCountHandler extends ChannelInboundHandlerAdapter { //這里用來對連接數進行記數,每兩秒輸出到控制臺 private static final AtomicInteger nConnection = new AtomicInteger(); static { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { System.out.println("連接數: " + nConnection.get()); }, 0, 2, TimeUnit.SECONDS); } @Override public void channelActive(ChannelHandlerContext ctx) { nConnection.incrementAndGet(); } @Override public void channelInactive(ChannelHandlerContext ctx) { nConnection.decrementAndGet(); }}上述的代碼會打包成jar放到linux上運行,對于上述的優化來說,程序方面的就暫時不做,下面會從操作系統層面進行優化,讓其支撐起百萬連接。
TCP連接四元組
在優化之前先來看下網絡里的一個小知識,TCP連接四元組: 服務器的IP+服務器的POST+客戶端的IP+客戶端的POST
端口的范圍一般是1到65535:
配置優化
現在在虛擬機上安裝兩個linux系統,配置分別是:
地址CPU內存JDK作用192.168.15.130VM-4核8G1.8客戶端192.168.15.128VM-4核8G1.8服務端
啟動服務端: java -Xmx4g -Xms4g -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Server > out.log 2>&1 & 啟動客戶端: java -Xmx4g -Xms4g -Dserver.host=192.168.15.128 -cp network-study-1.0-SNAPSHOT-jar-with-dependencies.jar com.dongnaoedu.network.netty.million.Client
啟動服務端后可以使用tail -f命令查看out.log中的日志:
客戶端啟動后,如果報了以下錯誤,需要修改系統的文件最大句柄和進程的文件最大句柄:
Caused by: java.io.IOException: Too many open files at sun.nio.ch.FileDispatcherImpl.init(Native Method) at sun.nio.ch.FileDispatcherImpl.(FileDispatcherImpl.java:35) ... 8 more優化系統最大句柄: 查看操作系統最大文件句柄數,執行命令cat /proc/sys/fs/file-max,查看最大句柄數是否滿足需要,如果不滿足,通過vim /etc/sysctl.conf命令插入如下配置:
fs.file-max = 1000000當并發接入的Tcp連接數超過上限時,就會提示“Too many open files”,所有的新客戶端接入將會失敗。通過vim /etc/security/limits.conf 修改配置參數:
* soft nofile 1000000* hard nofile 1000000修改配置參數后注銷生效。
- 如果程序被中斷,或報了異常
- 此時可以查看操作系統的日志more /var/log/messages,或在程序啟動時執行tail -f /var/log/messages 監控日志。如果日志中出現以下內容,說明需要優化TCP/IP參數
==優化TCP/IP相關參數:==
- 查看客戶端端口范圍限制
- 通過vim /etc/sysctl.conf 修改網絡參數
- 客戶端修改端口范圍的限制
- 優化TCP參數
==參數說明:==
net.ipv4.tcp_mem: 分配給tcp連接的內存,單位是page(1個Page通常是4KB,可以通過getconf PAGESIZE命令查看),三個值分別是最小、默認、和最大。比如以上配置中的最大是3145728,那分配給tcp的最大內存=31457284 / 1024 / 1024 = 12GB。一個TCP連接大約占7.5KB,粗略可以算出百萬連接≈7.51000000/4=1875000 3145728足以滿足測試所需。
net.ipv4.tcp_wmem: 為每個TCP連接分配的寫緩沖區內存大小,單位是字節。三個值分別是最小、默認、和最大。
net.ipv4.tcp_rmem: 為每個TCP連接分配的讀緩沖區內存大小,單位是字節。三個值分別是最小、默認、和最大。
net.ipv4.tcp_keepalive_time: 最近一次數據包發送與第一次keep alive探測消息發送的事件間隔,用于確認TCP連接是否有效。
net.ipv4.tcp_keepalive_intvl: 在未獲得探測消息響應時,發送探測消息的時間間隔。
net.ipv4.tcp_keepalive_probes: 判斷TCP連接失效連續發送的探測消息個數,達到之后判定連接失效。
net.ipv4.tcp_tw_reuse: 是否允許將TIME_WAIT Socket 重新用于新的TCP連接,默認為0,表示關閉。
net.ipv4.tcp_tw_recycle: 是否開啟TIME_WAIT Socket 的快速回收功能,默認為0,表示關閉。
net.ipv4.tcp_fin_timeout: 套接字自身關閉時保持在FIN_WAIT_2 狀態的時間。默認為60。
轉載于:https://juejin.im/post/6861560765200105486
作者:狐言不胡言
總結
以上是生活随笔為你收集整理的bootstrap外不引用连接_网络编程Netty IoT百万长连接优化,万字长文精讲的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 有没有必要买python课_请问自学 P
- 下一篇: 网站攻击软件_佳能遭严重勒索软件攻击,1