松花皮蛋的黑板報
  • 分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

掃一掃
關注公眾號

微服務架構之服務冶理Dubbo-Netty流程

博客首頁文章列表 松花皮蛋me 2019-06-27 00:03

一、服務消費者

服務引用時流程會走到DubboProtocol#refer方法,之前篇章中沒有提及Netty環節,本節補上

在getClients時如果有共享鏈接則直接使用,反之,通過信息交換層Exchangers進行connect得到客戶端client實例

可以看到信息交換層Exchanger是個SPI擴展點,默認實現HeaderExchanger

談談為什么心跳探測要在客戶端發起,而不是服務端去探測?如何應對網絡抖動情況下的節點管理?動態注冊中心可以把異常的節點及時去除然后通知到消費端,但是如果是因為網絡抖動誤判就比較麻煩了,可以設置比例但是消費端感知會有時延。那么換個思路解決這個問題,服務消費者并不嚴格以注冊中心的服務節點為準,而是根據實際情況來判斷提供者節點是否可用

接下來看下傳輸層Transporters.connect中的業務邏輯

Transporter是個SPI自適應擴展點,通過URL中的值獲取相應的實現類,默認是NettyClient,系統支持GrizzlyClient、MinaClient、NettyClient、NettyClient4

channelHandler參數是包裝了HeaderExchangeHandler的DecodeHandler先看下靜態變量中的nioEventLoopGroup字段,它設置的線程數是Math.min(Runtime.getRuntime().availableProcessors() + 1, 32),”CPU核數加1″或”32″,大家平時研發線程數設置可以參考這個,另外線程數不是越多越好,業界推薦的計算方法是最佳線程數=CPU核數*[1+(I/O耗時/CPU耗時)]

另外在NettyClient的父類構造方法中進行了handler包裝可以看到包裝成了包括心跳處理器的MuiltMessageHandler

這里的請求調度器Dispatcher是通過自適應擴展器得到的,默認實現是AllDispatcher

回到NettyClient中,在抽象類AbstractClient的構造方法中調用了模板方法doOpen和doConnect,那么我們先看下NettyClient的doOpen方法

主要業務邏輯是進行創建Netty客戶端,客戶端只需要創建一個 EventLoopGroup即可,然后將編解碼、心跳、業務處理器注冊到pipeline事件流中。另外connect方法無非就是調用ChannelFuture future = bootstrap.connect(getConnectAddress());得到future然后阻塞等待結果。另外配置ChannelOption選項時,通過TCP_NODELAY控制禁用了Nagle優化納格算法的工作方式是合并一定數量的輸出數據后一次提交。特別的是,只要有已提交的數據包尚未確認,發送者會持續緩沖數據包,直到累積一定數量的數據才提交,不過該算法與 TCP延遲確認會有不好的相互作用

我們知道服務引用時會調用DubboInvoker#doInvoke方法

HeaderExchangeChannel#request,這里需要注意的是,Request中有一個通過AtomicLong得到的mId用來來唯一表示請求對象,DefaultFuture靠的就是這個mId來關聯請求和應答消息,DefaultFuture中有兩個很重要的屬性:FUTURS和CHANNELS,它們類型都是ConcurrentHashMap,key為mId,在新建DefaultFuture對象時會把mId和相關的Future和Channel塞到這兩個Map中;還有一個ReentrantLock類型的lock屬性,用于阻塞來等待應答

DefaultFuture#received

DefaultFuture#doReceived,當獲取結果時也就是調用DefaultFuture#get時,會阻塞一段時間,直到有結果返回或者超時時喚醒線程


回到請求流程中,send方法實際上是NettyChannel#send方法

這里需要注意兩點:

1、writeAndFlush寫隊列并刷新,實際上netty會把要發送的消息保存在ChannelOutboundBuffer里面,如果網絡對方處理速度比較慢或者消息發送比較快或者消息發送量過大都有可能導致內存溢出;。建議優化方式:通過啟動項channelOption設置發送隊列的長度,或者通過-D啟動參數配置長度

2、nettyChannel#send方法中沒有直接進行回推發送失敗的消息,因為Dubbo提供了容錯補償機制,如果在日常中使用Netty需要自己處理發送失敗的消息,不可生吞異常


二、服務提供者


流程復習
RegistryProtocol#export=>DubboProtocol#export=>DubboProtocol#openServer=>DubboProtocol#createServer=>Exchangers#bind=>HeaderExchanger#bind=>Transporters#bind=>NettyServer#init=>AbstractServer#init=>NettyServer#doOpen

初始化EventLoopGroup-bossGroup,它只負責認證授權、連接然后將socket注冊到IO連接池中的某個channel,線程數為1,不過Netty推薦使用的是線程池。初始化EventLoopGroup-workerGroup,負責IO讀寫。線程數:Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)。將EventLoopGroup注冊到bootstrap并處理連接,可以看到使用的是一主多從線程模型,Netty根據group參數設置不多的reactor線程模型,默認支持單線程、多線程模型、主從多線程模型,配置非常靈活。最后配置channel,配置連接處理器,然后綁定,阻塞等待關閉


當接收到客戶端的消息時,NettyServerHandler#channelRead

AllChannelHandler#received,從WrappedChannelHandler的getExecutorService方法獲取線程池,如果通過SPI自適應擴展器無法獲取實例對象,就使用DubboSharedHandler線程池。然后提交標識為received的線程任務,提交任務的時候可能會拋出RejectedExecutionException,因為IO線程池是無界的(0-Integer.MAX_VALUE),但服務調用線程池是有界的。拋出異常后將進入caught方法來處理,而該方法使用的仍然是業務線程池,所以很有可能這時業務線程池還是滿的。所以生產環境中為了減少在Provider線程池打滿時整個系統雪崩的風險,建議將Dispatcher設置成message

FixedThreadPool線程池,默認處理業務的線程數是200,處理消息的隊列長度不限制

提交ChannelEventRunnable線程任務,根據state不同進行處理,接收信息、連接、斷連、發送、異常

DecodeHandler#received

HeaderExchangeHandler#received=>this#handleRequest。流程復習:DubboProtocol#createServer=>Exchangers#bind=>HeaderExchanger#bind=>HeaderExchangeHandler#init,所以構造方法中的handler參數其實是DubboProtcol中的requestHandler

HeaderExchangerHandler#handleRequest

DubboProtocol.requestHandler#reply

Invoker.invoke調用鏈大概是echoFilter=>genericFilter=>contextFilter=>traceFilter=>timeoutFilter=>exceptionFilter=>RegistryProrocol$invokerdelegate=>JavassistProxyFactory#getInvoker#doinvoke


通過wrapper包裝類執行invokeMethod得到結果,然后通過netty的channel響應消息給服務引用者

黑龙江6+1开奖结果查询