宜人贷蜂巢API网关技术解密之Netty使用实践
2019獨角獸企業重金招聘Python工程師標準>>>
一、背景
宜人貸蜂巢團隊,由Michael創立于2013年,通過使用互聯網科技手段助力金融生態和諧健康發展。自成立起一直致力于多維度數據閉環平臺建設。目前團隊規模超過百人,涵蓋征信、電商、金融、社交、五險一金和保險等用戶授信數據的抓取解析業務,輔以先進的數據分析、挖掘和機器學習等技術對用戶信用級別、欺詐風險進行預測評定,全面對外輸出金融反欺詐、社交圖譜、自動化模型定制等服務或產品。
目前宜人貸蜂巢基于用戶授權數據實時抓取解析技術,并結合頂尖大數據技術,快速迭代和自主的創新,已形成了強大而領先的聚合和輸出能力。
為了適應完成宜人貸蜂巢強大的服務輸出能力,蜂巢設計開發了自己的API網關系統,集中實現了鑒權、加解密、路由、限流等功能,使各業務抓取團隊關注其核心抓取和分析工作,而API網關系統更專注于安全、流量、路由等問題,從而更好的保障蜂巢服務系統的質量。今天帶著大家解密API網關的Netty線程池技術實踐細節。
API網關作為宜人貸蜂巢數據開放平臺的統一入口,所有的客戶端及消費端通過統一的API來使用各類抓取服務。從面向對象設計的角度看,它與外觀模式類似,包裝各類不同的實現細節,對外表現出統一的調用形式。
本文首先簡要地介紹API網關的項目框架,其次對比BIO和NIO的特點,再引入Netty作為項目的基礎框架,然后介紹Netty線程池的原理,最后深入Netty線程池的初始化、ServerBootstrap的初始化與啟動及channel與線程池的綁定過程,讓讀者了解Netty在承載高并發訪問的設計路思。
二、項目框架
圖1 - API網關項目框架
圖中描繪了API網關系統的處理流程,以及與服務注冊發現、日志分析、報警系統、各類爬蟲的關系。其中API網關系統接收請求,對請求進行編解碼、鑒權、限流、加解密,再基于Eureka服務注冊發現模塊,將請求發送到有效的服務節點上;網關及抓取系統的日志,會被收集到elk平臺中,做業務分析及報警處理。
三、BIO vs NIO
API網關承載數倍于爬蟲的流量,提升服務器的并發處理能力、縮短系統的響應時間,通信模型的選擇是至關重要的,是選擇BIO,還是NIO?
Streamvs Buffer & 阻塞 vs 非阻塞
BIO是面向流的,io的讀寫,每次只能處理一個或者多個bytes,如果數據沒有讀寫完成,線程將一直等待于此,而不能暫時跳過io或者等待io讀寫完成異步通知,線程滯留在io讀寫上,不能充分利用機器有限的線程資源,造成server的吞吐量較低,見圖2。而NIO與此不同,面向Buffer,線程不需要滯留在io讀寫上,采用操作系統的epoll模式,在io數據準備好了,才由線程來處理,見圖3。
圖2 – BIO 從流中讀取數據
圖3 – NIO 從Buffer中讀取數據
Selectors
NIO的selector使一個線程可以監控多個channel的讀寫,多個channel注冊到一個selector上,這個selector可以監測到各個channel的數據準備情況,從而使用有限的線程資源處理更多的連接,見圖4。所以可以這樣說,NIO極大的提升了服務器接受并發請求的能力,而服務器性能還是要取決于業務處理時間和業務線程池模型。
圖4 – NIO 單一線程管理多個連接
而BIO采用的是request-per-thread模式,用一個線程負責接收TCP連接請求,并建立鏈路,然后將請求dispatch給負責業務邏輯處理的線程,見圖5。一旦訪問量過多,就會造成機器的線程資源緊張,造成請求延遲,甚至服務宕機。
圖5 – BIO 一連接一線程
對比JDK NIO與諸多NIO框架后,鑒于Netty優雅的設計、易用的API、優越的性能、安全性支持、API網關使用Netty作為通信模型,實現了基礎框架的搭建。
四、Netty線程池
考慮到API網關的高并發訪問需求,線程池設計,見圖6。
圖6 – API網關線程池設計
Netty的線程池理念有點像ForkJoinPool,不是一個線程大池子并發等待一條任務隊列,而是每條線程都有一個任務隊列。而且Netty的線程,并不只是簡單的阻塞地拉取任務,而是在每個循環中做三件事情:
-
先SelectKeys()處理NIO的事件
-
然后獲取本線程的定時任務,放到本線程的任務隊列里
-
最后執行其他線程提交給本線程的任務
每個循環里處理NIO事件與其他任務的時間消耗比例,還能通過ioRatio變量來控制,默認是各占50%。可見,Netty的線程根本沒有阻塞等待任務的清閑日子,所以也不使用有鎖的BlockingQueue來做任務隊列了,而是使用無鎖的MpscLinkedQueue(Mpsc 是Multiple Producer, Single Consumer的縮寫)
五、NioEventLoopGroup初始化
下面分析下Netty線程池NioEventLoopGroup的設計與實現細節,NioEventLoopGroup的類層次關系見圖7
圖7 –NioEvenrLoopGroup類層次關系
其創建過程——方法調用,見下圖
圖8 –NioEvenrLoopGroup創建調用關系
NioEvenrLoopGroup的創建,具體執行過程是執行類MultithreadEventExecutorGroup的構造方法
/*** Create a new instance.** @param nThreads the number of threads that will be used by this instance.* @param executor the Executor to use, or {@code null} if the default should be used.* @param chooserFactory the {@link EventExecutorChooserFactory} to use.* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call*/protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;} catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}chooser = chooserFactory.newChooser(children);final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}其中,創建細節如下:
-
線程池中的線程數nThreads必須大于0;
-
如果executor為null,創建默認executor,executor用于創建線程(newChild方法使用executor對象);
-
依次創建線程池中的每一個線程即NioEventLoop,如果其中有一個創建失敗,將關閉之前創建的所有線程;
-
chooser為線程池選擇器,用來選擇下一個EventExecutor,可以理解為,用來選擇一個線程來執行task;
chooser的創建細節,如下:
DefaultEventExecutorChooserFactory根據線程數創建具體的EventExecutorChooser,線程數如果等于2^n,可使用按位與替代取模運算,節省cpu的計算資源,見源碼:
@SuppressWarnings("unchecked")@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}} private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTowEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}}newChild(executor, args)的創建細節,如下
MultithreadEventExecutorGroup的newChild方法是一個抽象方法,故使用NioEventLoopGroup的newChild方法,即調用NioEventLoop的構造函數。
@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}在這里先看下NioEventLoop的類層次關系
NioEventLoop的繼承關系比較復雜,在AbstractScheduledEventExecutor 中, Netty 實現了 NioEventLoop 的 schedule 功能, 即我們可以通過調用一個 NioEventLoop 實例的 schedule 方法來運行一些定時任務. 而在 SingleThreadEventLoop 中, 又實現了任務隊列的功能, 通過它, 我們可以調用一個NioEventLoop 實例的 execute 方法來向任務隊列中添加一個 task, 并由 NioEventLoop 進行調度執行.
通常來說, NioEventLoop 肩負著兩種任務, 第一個是作為 IO 線程, 執行與 Channel 相關的 IO 操作, 包括調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等; 而第二個任務是作為任務隊列, 執行 taskQueue 中的任務, 例如用戶調用 eventLoop.schedule 提交的定時任務也是這個線程執行的.
具體的構造過程,如下
創建任務隊列tailTasks(內部為有界的LinkedBlockingQueue)
創建線程的任務隊列taskQueue(內部為有界的LinkedBlockingQueue),以及任務過多防止系統宕機的拒絕策略rejectedHandler
其中tailTasks和taskQueue均是任務隊列,而優先級不同,taskQueue的優先級高于tailTasks,定時任務的優先級高于taskQueue。
六、ServerBootstrap初始化及啟動
了解了Netty線程池NioEvenrLoopGroup的創建過程后,下面看下API網關服務ServerBootstrap的是如何使用線程池引入服務中,為高并發訪問服務的。
API網關ServerBootstrap初始化及啟動代碼,如下:
serverBootstrap = new ServerBootstrap();bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()).option(ChannelOption.SO_BACKLOG, config.getBacklogSize()).option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())// Memory pooled.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(channelInitializer);ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();log.info("API-gateway started on port: {}", config.getPort());future.channel().closeFuture().sync();API網關系統使用netty自帶的線程池,共有三組線程池,分別為bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暫不作介紹)。其中,bossGroup用于接收客戶端的TCP連接,workerGroup用于處理I/O、執行系統task和定時任務,executorGroup用于處理網關業務加解密、限流、路由,及將請求轉發給后端的抓取服務等業務操作。
七、Channel與線程池的綁定
ServerBootstrap初始化后,通過調用bind(port)方法啟動Server,bind的調用鏈如下:
AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化過程指定channel為NioServerSocketChannel.class,至此將NioServerSocketChannel與bossGroup綁定到一起,bossGroup負責客戶端連接的建立。那么NioSocketChannel是如何與workerGroup綁定到一起的?
調用鏈AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;child.pipeline().addLast(childHandler);for (Entry<ChannelOption<?>, Object> e: childOptions) {try {if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {logger.warn("Unknown channel option: " + e);}} catch (Throwable t) {logger.warn("Failed to set a channel option: " + child, t);}}for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}其中,childGroup.register(child)就是將NioSocketChannel與workderGroup綁定到一起,那又是什么觸發了ServerBootstrapAcceptor的channelRead方法?
其實當一個 client 連接到 server 時, Java 底層的 NIO ServerSocketChannel 會有一個SelectionKey.OP_ACCEPT 就緒事件, 接著就會調用到 NioServerSocketChannel.doReadMessages方法
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = javaChannel().accept();try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {…}return 0;}javaChannel().accept() 會獲取到客戶端新連接的SocketChannel,實例化為一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this), 由此可知, 我們創建的這個NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 .
接下來就經由 Netty 的 ChannelPipeline 機制, 將讀取事件逐級發送到各個 handler 中, 于是就會觸發前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。
至此,分析了Netty線程池的初始化、ServerBootstrap的啟動及channel與線程池的綁定過程,能夠看出Netty中線程池的優雅設計,使用不同的線程池負責連接的建立、IO讀寫等,為API網關項目的高并發訪問提供了技術基礎。
八、總結
至此,對API網關技術的Netty實踐分享就到這里,各位如果對中間的各個環節有什么疑問和建議,歡迎大家指正,我們一起討論,共同學習提高。
參考
http://tutorials.jenkov.com/java-nio/nio-vs-io.html
http://netty.io/wiki/user-guide-for-4.x.html
http://netty.io/
http://www.tuicool.com/articles/mUFnqeM
https://segmentfault.com/a/1190000007403873
https://segmentfault.com/a/1190000007283053
作者:蜂巢團隊
來源:宜信技術學院
轉載于:https://my.oschina.net/u/4007037/blog/3058640
總結
以上是生活随笔為你收集整理的宜人贷蜂巢API网关技术解密之Netty使用实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: weh shell高大上?一文教你实现
- 下一篇: 清北NOIP训练营集训笔记——图论(提高