kafka 启动_深入理解Kafka服务端之Acceptor线程是如何启动和工作的
生活随笔
收集整理的這篇文章主要介紹了
kafka 启动_深入理解Kafka服务端之Acceptor线程是如何启动和工作的
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、場景分析上一篇講到了Kafka網絡通信采用了Java NIO的主從Reactor多線程模型,而Acceptor就是Kafka網絡通信中很重要的一個線程對象。它通過selector接收客戶端的連接請求,并交給某個Processor線程處理。那么這個Acceptor線程對象是如何創建并啟動的呢?它又是如何工作的?這篇我們進行詳細分析。
二、圖示說明
三、源碼分析
??1.?Acceptor線程是如何創建和啟動的首先,找到Kafka服務端的入口,即core模塊下的src/main/scala/kafka/Kafka.scala類的main()方法。def main(args: Array[String]): Unit = { try { //解析參數,返回一個Properties對象 val serverProps = getPropsFromArgs(args) //根據給定的Properties對象,構建一個kafkaServerStartable對象??????val?kafkaServerStartable?=?KafkaServerStartable.fromProps(serverProps) ... //TODO 服務端啟動核心代碼 kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown() }????...}這里主要看kafkaServerStartable.startup()方法,里面調用了server.startup()方法。這個方法內容比較多,整個Kafka服務端的功能都在里面,我們找到和網絡通信相關的代碼,如下:def startup() { ... //TODO NIO的服務端,在這個里面創建了acceptor線程和processor線程????//?config就是解析參數獲取到的KafkaConfig配置對象 socketServer = new SocketServer(config, metrics, time, credentialProvider) socketServer.startup(startupProcessors = false) ...}socketServer.startup方法的邏輯如下,這里注意傳入的startupProcessors 參數為false,所以方法中 if 條件語句中的代碼不執行:def startup(startupProcessors: Boolean = true) { this.synchronized {????//ConnectionQuotas是更新連接配額統計信息的類 connectionQuotas = new ConnectionQuotas(config, time) //創建并啟動處理控制類請求的Acceptor和Processor createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) //創建并啟動處理數據類請求的Acceptor和Processor // numNetworkThreads:服務端Network線程數,默認為3 createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners) //傳入startupProcessors參數為false,里面代碼不執行 if (startupProcessors) { startControlPlaneProcessor() startDataPlaneProcessors() } }主要看這個方法中的兩行代碼:createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)代碼a. 創建并啟動處理控制類請求的Acceptor和Processor線程createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)代碼b.?創建并啟動處理數據類請求的Acceptor和Processor線程那么這里的控制類請求(Control plane)和數據類請求(Data plane)指什么呢?這里我們先大概了解下:控制類請求指控制器Controller和Broker交互的請求,而數據類請求包括PRODUCER、FETCH等操作數據的請求。區分這兩種請求類型主要為了區分請求的優先級。目前我們只分析數據類請求的處理,所以重點看上面的代碼b,對應的createDataPlaneAcceptorsAndProcessors方法如下:注意:方法中的config.numNetworkThreads參數就是num.network.threads的值,即3private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,//每個服務對應的processor線程數,默認為3 endpoints: Seq[EndPoint]): Unit = synchronized { //在kafka目錄的config/server.properties文件中可以配置多個kafka服務 //比如: // hadoop01:9092 // hadoop01:9093 // hadoop01:9094 //那么這里的endpoints就是這多個服務的集合,而endpoint就對應hadoop01:9092,hadoop01:9093... //但是一般不會設置多個 endpoints.foreach { endpoint => //增加一個監聽器 connectionQuotas.addListener(config, endpoint.listenerName) //創建Acceptor線程 val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix) //在Acceptor對象中添加一組processor線程 addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) //傳入Acceptor對象,啟動線程 KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start() //阻塞線程 dataPlaneAcceptor.awaitStartup() //將Acceptor對象放入集合,通常情況只有一個Acceptor對象 dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) info(s"Created data-plane acceptor and processors for endpoint : $endpoint") }}參數中的endpoints是一個Endpoint對象的列表,Endpoint是Kafka中監聽器對應的類,我們可以在server.properties配置文件中配置多個服務,這樣就會有多個Endpoint對象。當然一般不會配這個參數,所以默認只有1個,我們這里也按1個Endpoint對象來分析。a. 創建一個Acceptor線程對象val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix我們看一下Acceptor的定義:private[kafka] class Acceptor(val endPoint: EndPoint,//kafka broker的連接信息,包括主機名和端口號等 val sendBufferSize: Int,//設置發送數據的緩沖大小,默認100k val recvBufferSize: Int,//設置接收數據的緩沖大小,默認100k,如果client和broker端的網絡延遲大,建議調大這兩個參數 brokerId: Int, connectionQuotas: ConnectionQuotas, metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { //創建底層的NIO Selector對象 private val nioSelector = NSelector.open() //服務端創建對應的ServerSocketChannel實例,后序會把這個Channel向上一步創建的Selector對象注冊 val serverChannel = openServerSocket(endPoint.host, endPoint.port) //processor線程數組,Acceptor線程初始化時,會創建processor的線程池 private val processors = new ArrayBuffer[Processor]() //標記processors是否啟動 private val processorsStarted = new AtomicBoolean private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value)) ...}從定義看,創建Acceptor線程接收了6個參數,重要的主要是前3個:
- endpoint:broker的連接信息,其中包含主機名,端口號等
- sendBufferSize:設置發送數據的緩沖大小,默認100k。由broker端參數socket.send.buffer.bytes配置
- recvBufferSize:設置接收數據的緩沖大小,默認100k。由broker端參數socket.receive.buffer.bytes配置
- nioSelector:Java NIO中的Selector對象
- processors:Processor線程池。Acceptor在初始化時,會創建對應的Processor線程池,所以Processor線程對象是由Acceptor對象管理的。
這個方法的主要作用就是循環創建一組Processor線程對象,循環的次數由參數newProcessorsPerListener決定,即3。然后將這些對象添加到Acceptor對象的processors線程池對象中,并啟動這些processor線程。
private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized { val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() //循環創建processor線程,循環次數為newProcessorsPerListener(3) for (_ 0 until newProcessorsPerListener) { val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool) //放入數組 listenerProcessors += processor //在RequestChannel對象中添加processor線程對象 dataPlaneRequestChannel.addProcessor(processor) nextProcessorId += 1 } //遍歷所有的processor,將其放入集合中 listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p)) //給Acceptor對象添加一組processor,即3個,并啟動processor線程 acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)}acceptor.addProcessors方法對應的代碼如下:
private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized { processors ++= newProcessors//添加一組新的processor線程到processors線程池 if (processorsStarted.get)//如果processors線程池已經啟動 startProcessors(newProcessors, processorThreadPrefix)//啟動新添加的processor線程}通過startProcessors方法就啟動了這些Processor線程對象。
c. 設置Acceptor線程為非后臺線程,并啟動:KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()d. 阻塞Acceptor線程,這里使用了CountDownLatch,是JDK提供的并發流程控制的工具類。初始化時會傳入一個int類型的參數,即需要倒數的起始數,每調用一次countDown,這個數就減1,當減為0時,結束等待。dataPlaneAcceptor.awaitStartup()這里的起始數為1,即只要startupLatch變量的countDown()方法被調用,Acceptor線程就會結束等待:private val startupLatch = new CountDownLatch(1)這里startupComplete方法中調用了startupLatch.countDown():protected def startupComplete(): Unit = { // Replace the open latch with a closed one shutdownLatch = new CountDownLatch(1) startupLatch.countDown()}至此,Acceptor線程和其管理的Processor線程都已經創建完成并啟動。????2.?Acceptor線程是如何工作的
既然Acceptor是一個Runnable接口的實例對象,那么它的工作邏輯一定在run()方法中,代碼如下,該方法較長,截取重要部分:def run() { //在Selector上注冊一個OP_ACCEPT事件 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)????//等待Acceptor線程啟動完成 ????startupComplete() try { //當前processor在線程池中的下標 var currentProcessorIndex = 0 //服務一直不斷地循環 while (isRunning) {??????try?{ //select方法查看是否有事件注冊上來,即獲取準備好的SelectionKey的數量 val ready = nioSelector.select(500) if (ready > 0) { //獲取SelectionKey的集合 val keys = nioSelector.selectedKeys() val iter = keys.iterator() //遍歷所有的SelectionKey while (iter.hasNext && isRunning) { try { val key = iter.next //移除SelectionKey iter.remove() //如果是連接事件 if (key.isAcceptable) { //處理連接請求,返回一個SocketChannel??????????????????accept(key).foreach?{?socketChannel?=> //processors就是處理器的集合,這里先獲取空閑處理器的數量 var retriesLeft = synchronized(processors.length) var processor: Processor = null //給上面的SocketChannel分配一個Processor線程 do { //數量減1 retriesLeft -= 1 //分配一個processor,即指定這個鏈接由哪個processor線程處理??????????????????????processor?=?synchronized?{ //從processors中取出一個processor,實現輪詢的效果, currentProcessorIndex = currentProcessorIndex % processors.length processors(currentProcessorIndex) } currentProcessorIndex += 1 //如果分配到Processor,將對應的SocketChannel對象放入newConnections集合 } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) } } else throw new IllegalStateException("Unrecognized key state for acceptor thread.") } ??????????????...}a. 在Selector上給ServerSocketChannel對象注冊一個OP_ACCEPT事件,用來接收連接請求:serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)b. 等待Acceptor線程啟動完成startupComplete()c. 不斷循環,判斷selector上面是否有注冊的事件發生,如果有,遍歷事件對應的SelectionKeywhile (isRunning) { try { //select方法查看是否有事件注冊上來,即獲取準備好的SelectionKey的數量 val ready = nioSelector.select(500) if (ready > 0) { //獲取SelectionKey的集合 val keys = nioSelector.selectedKeys() val iter = keys.iterator() //遍歷所有的SelectionKey while (iter.hasNext && isRunning) { try { val key = iter.next //移除SelectionKey iter.remove()d. 如果是連接事件,則調用accept方法,該方法內部就是一系列的Java NIO操作,最終返回一個SocketChannel對象。e.?給每一個SocketChannel對象分配一個Processor線程對象,分配策略就是通過輪詢的方式,根據下標從processors線程池中獲取。if (key.isAcceptable) { //處理連接請求,返回一個SocketChannel accept(key).foreach { socketChannel => //processors就是處理器的集合,這里先獲取空閑處理器的數量 var retriesLeft = synchronized(processors.length) var processor: Processor = null //給上面的SocketChannel分配一個Processor線程 do { //數量減1 retriesLeft -= 1 //分配一個processor,即指定這個鏈接由哪個processor線程處理 processor = synchronized { //從processors中取出一個processor,實現輪詢的效果, currentProcessorIndex = currentProcessorIndex % processors.length processors(currentProcessorIndex) } currentProcessorIndex += 1 //如果分配到Processor,將對應的SocketChannel對象放入newConnections集合??????}?while?(!assignNewConnection(socketChannel,?processor,?retriesLeft?==?0))??f. 如果分配到了processor線程對象,就將這個SocketChannel對象放入該processor對象的阻塞隊列newConnections中:assignNewConnection方法內部調用了processor.accept方法:def accept(socketChannel: SocketChannel, mayBlock: Boolean, acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = { val accepted = { //向隊列中放入指定的SocketChannel if (newConnections.offer(socketChannel)) true //判斷是否有可用的Processor線程 else if (mayBlock) { val startNs = time.nanoseconds //將SocketChannel放入隊列 newConnections.put(socketChannel) acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs) true } else false }??if?(accepted) wakeup() accepted}通過newConnections.offer(socketChannel)方法將指定的SocketChannel放入processor的阻塞隊列newConnections中。總結:Acceptor線程是在服務端程序啟動的時候創建和啟動的
每個Acceptor線程默認管理了3個Processor線程對象
Acceptor線程通過selector接收客戶端的連接事件
一旦有連接事件發生,就創建一個SocketChannel對象
通過輪詢的方式,將創建的SocketChannel對象分配給自己管理的Processor線程
每個Processor線程有一個保存SocketChannel的阻塞隊列newConnections,該隊列的容量為固定的20
綜上,Acceptor線程做的事比較簡單:接收客戶端連接請求,創建對應的SocketChannel并輪詢交給Processor線程處理
總結
以上是生活随笔為你收集整理的kafka 启动_深入理解Kafka服务端之Acceptor线程是如何启动和工作的的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: birt脚本for循环_Shell脚本应
- 下一篇: qtcreator摄像头显示时间_屏下摄