松花皮蛋的黑板報
  • 分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

掃一掃
關注公眾號

微服務架構之Dubbo-服務引用

博客首頁文章列表 松花皮蛋me 2019-06-13 22:58

注:公眾號關于dubbo解讀文章均基于apache--incubating-2.7.1版本,發版于5月26號,此版本注冊中心(多數是zookeeper)在某些特殊場景下會出現重復URL地址數據無法刪除,導致消費方拿到的是失效地址,從而導致調用失敗的問題。如果你也在使用此版本進行源碼學習,在網絡漂移(下班回家再調試源碼)的情況下需要手動刪除zookeeper的dubbo節點路徑

服務引用示例

public class Application {
    /**
     * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before
     * launch the application
     */
    public static void main(String[] args) {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(DemoService.class);
        DemoService service = reference.get();
        String message = service.sayHello("dubbo");
        System.out.println(message);
    }
}

這里得到的DemoService是什么?將下來一層一層地掀開迷底

ReferenceConfig#get=>ReferenceConfig#init,在init方法中會執行ref = createProxy(map);

 @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        //TODO 本地引用inJvm
        if (shouldJvmRefer(map)) {
                invoker = refprotocol.refer(interfaceClass, url);
        } else {
            //TODO urls為服務引用的接口信息,URL是dubbo中的統一數據模型
            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                }
                if (registryURL != null) { 
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { 
                    //TODO 直連
                    //TODO 適用于測試環境或者注冊中心不可用時需要發布的情況
                   invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }
        //TODO 創建服務代理
        return (T) proxyFactory.getProxy(invoker);
    }

直連中的StaticDirectory的作用是本文第一個重點關注對象,實際上集群目錄服務實現父抽象類AbstractDirectory的doList模板方法,會返回經過路由過濾后的Invoker列表,路由過濾也就是服務路由,常用于設置分組調用、同機房調用優先、灰度分布、流量切換、讀寫分離等。另外還有RegistryDirectory、MockDirectory目錄服務,可以不加思索地猜想RegistryDirectory會動態維護Invoker列表,StaticDirector則是直接返回

Protocol中的refer方法是被@Adaptive注解修飾的,說明它是一個自適應擴展點,自適應擴展點加在方法層面上,表示會動態生成一個自適應的適配器,比如這里的DubboProtocol$Adaptive,并且默認實現是”dubbo”,最終實現可以通過在URL指定

 /**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 */
@SPI("dubbo")
public interface Protocol {

    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();

}

refprotocol#refer先后經過filter包裝類ProtocolFilterWrapper、ProtocolListenerWrapper最后執行RegistryProtocol。這些包裝類是在創建擴展器時,通過查找構造方法的參數類型獲取Wrapper類,然后將自身注入,前者ProtocolFilterWrapper負責過濾器,Dubbo允許我們在provider端設置權限校驗、緩存、限流等等一些Filter過濾器,也可以在consumer端設置一些Filter,這是一種責任鏈模式;后者ProtocolListenerWrapper負責監聽器,Dubbo允許consumer端在調用之前、調用之后或出現異常時,觸發oninvoke、onreturn、onthrow三個事件

  private T createExtension(String name) {

        try {  
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }       
    }   

RegistryProtocol#refer=>doRefer

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //TODO 對多個invoker進行封裝
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) &amp;&amp; url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            //TODO 注冊
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        //TODO 訂閱,監聽變化
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
        //TODO 根據容錯模式和負載均衡算法獲取invoker
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

簡單回顧下之前提到的服務注冊,執行鏈大概是這樣的,Registry根據URL通過registryFactory獲取自適應適配類,最后會執行FailbackRegistry中的模塊方法doRegister,真正的實現類是ZookeeperRegistry,創建路徑節點,將url信息寫入zookeeper中

/***************分隔線***********************/

接下來說下訂閱監聽zookeeper變化

RegistryDirectory#subscribe

public void subscribe(URL url) {
        setConsumerUrl(url);
        consumerConfigurationListener.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        registry.subscribe(url, this);
    }

FailbackRegistry#subscribe

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        try {
            doSubscribe(url, listener);
        } catch (Exception e) {
            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
                notify(url, listener, urls);
            }
            addFailedSubscribed(url, listener);
        }
    }

    //TODO 模板方法
    public abstract void doSubscribe(URL url, NotifyListener listener);

ZookeeperRegistry#doSubscribe

 @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    //TODO 服務提供者dubbo/org.apache.dubbo.demo.DemoService/providers
                    //TODO 服務配置dubbo/org.apache.dubbo.demo.DemoService/configurators
                    /TODO 服務路由/dubbo/org.apache.dubbo.demo.DemoService/routers
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                    //TODO 只有服務提供者才有children,也就是具體某個實例
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }       

可以看到當有注冊中心有服務列表更新的時候會執行通知,此接口接受三種類別的url,包括服務提供方provider、服務配置configurator、服務路由router。通知方法執行鏈大概是FailbackRegistry#notify=>AbstractRegistry#notify=>RegistryDirectory#notify

AbstractRegistry#notify

for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            //TODO 通知
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            //TODO 更新緩存
            saveProperties(url);
        }  

AbstractRegistry#saveProperties

    File lockfile = new File(file.getAbsolutePath() + ".lock");
    if (!lockfile.exists()) {
        lockfile.createNewFile();
    }
    try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
         FileChannel channel = raf.getChannel()) {
        FileLock lock = channel.tryLock();
        if (lock == null) {
            throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
        }
        // Save
        try {
            if (!file.exists()) {
                file.createNewFile();
            }
            try (FileOutputStream outputFile = new FileOutputStream(file)) {
                properties.store(outputFile, "Dubbo Registry Cache");
            }
        } finally {
            lock.release();
        }
    }

平時我們更習慣使用輸入流和輸出流操作文件,將輸入流的數據寫入到輸出流中,但是利用FileChannel會更加高效,它能直連輸入和輸出流的文件通道

/***************分隔線*************/

回到notify方法中,執行鏈RegistryDirectory#notify=>RegistryDirectory#refreshOverrideAndInvoker=>RegistryDirectory#refreshInvoker

@Override
public URL getUrl() {
    return this.overrideDirectoryUrl;
}

 private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
  }

 private void refreshInvoker(List<URL> invokerUrls) {
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
      List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;
  }


    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
        String key = url.toFullString(); // The parameter urls are sorted
        invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
        newUrlInvokerMap.put(key, invoker);
        return newUrlInvokerMap;
    }  

在refreshOverrideAndInvoker中會更新routerChain,也會更新overrideDirectoryUrl等,invoker#getUrl實際上是取的overrideDirectoryUrl,而擴展點適配器選擇具體實現是根據URL來的。注:URL在Dubbo中是統一的數據模式

protocol#refer又用到了Dubbo SPI機制,執行鏈是Protocol$Adaptive#refer=>ProtocolListenerWrapper#refer=>ProtocolFilterWrapper#refer=>DubboProtocol#refer,其中在 DubboProtocol#refer方法中會構建DubboInvoker對象

DubboProtocol#refer

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

refer方法參數中還有一個很重要的DubboProtocol#getClient方法,應該是和Netty相關的,下一篇再講解

/**************分隔線*************/

回到doRefer中的Cluster.join(directory),Cluster是一個集群容錯接口,同時也是一個@Adaptive自適應擴展點,默認實現類是FailoverCluster.NAME。Dubbo主要內置了如下幾種策略:失敗自動切換Failover、安全失敗Failsafe、快速失敗Failfast、失敗自動恢復Failback、并行調用Forking、廣播Broadcast

@SPI(FailoverCluster.NAME)
public interface Cluster {

    /**
     * Merge the directory invokers to a virtual invoker.
     *
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

前面我們說過ExtensionLoader在實例化對象時,會將自己(這里是FailOverCluster)注入到包裝類中,所以這里實際上是調用MockClusterWrapper#join,所以在ReferenceConfig#createProxy中的invoker = refprotocol.refer(interfaceClass, urls.get(0))中得到的invoker是一個MockClusterWrapper包裝類。使用這種機制可以把一些公共處理放在Wrapper包裝類中

/************************題外話-開始*************************/

這里插一句題外話,Dubbo是如何實現Forking調用?

   //TODO 并行調用多個服務器,只要一個成功即返回。通常用于實時性要求較高的讀操作
            // 但需要浪費更多服務資源。可通過 forks="2" 來設置最大并行數
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            //TODO 使用阻塞隊列
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Result result = invoker.invoke(invocation);
                            ref.offer(result);
                        } catch (Throwable e) {
                            int value = count.incrementAndGet();
                            //TODO 如果沒有一個成功那么所有都失敗才會返回
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                //TODO 阻塞等待結果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;

可以看到它是通過阻塞隊列完成的,只要有一個成功就返回,隱性的含義是如果沒有成本則所有失敗才返回,返回最后一個異常結果,通常適用于實時性要求高的操作,但是資源成本高,所以一般通過設置forks參數限定最大并行數,我們也可以使用CompletionService達到相同的效果

// 創建線程池
ExecutorService executor =
  Executors.newFixedThreadPool(3);
// 創建 CompletionService
CompletionService<Integer> cs =
  new ExecutorCompletionService<>(executor);
// 用于保存 Future 對象
List<Future<Integer>> futures =
  new ArrayList<>(3);
// 提交異步任務,并保存 future 到 futures 
futures.add(
  cs.submit(()->sayHello1()));
futures.add(
  cs.submit(()->sayHello2()));
futures.add(
  cs.submit(()->sayHeelo2()));
// 獲取最快返回的任務執行結果
Integer r = 0;
try {
  // 只要有一個成功返回,則 break
  for (int i = 0; i < 3; ++i) {
    r = cs.take().get();
    // 簡單地通過判空來檢查是否成功返回
    if (r != null) {
      break;
    }
  }
} finally {
  // 取消所有任務
  for(Future<Integer> f : futures)
    f.cancel(true);
}
return r;

當需要批量提交異步任務的時候使用CompletionService。CompletionService將線程池 Executor和阻塞隊列BlockingQueue的功能融合在了一起,能夠讓批量異步任務的管理更簡單。除此之外,CompletionService能夠讓異步任務的執行結果有序化,先執行完的先進入阻塞隊列,利用這個特性,你可以輕松實現后續處理的有序性,避免無謂的等待。嗯,作者就是這么照顧讀者,看源碼一定要學到東西,學到東西也會分享出來,所有快快關注“松花皮蛋的黑板報”一起漲見識吧!

/***********題外話-結束******************/

說回到消費端入口ReferenceConfig

private T createProxy(Map<String, String> map) {
        invoker = refprotocol.refer(interfaceClass, urls.get(0));  
        return (T) proxyFactory.getProxy(invoker);
    }

前面我們講清楚了invoker是一個引用了FailOverClusterInvoker的MockClusterInvoker,接下來看下getProxy執行鏈是

ProxyFactory$Adaptive#getProxy=>StubProxyFactoryWrapper#getProxy
=>AbstractProxyFactory#getProxy=>JavassistProxyFactory#getProxy

先看下包裝類的方法,StubProxyFactoryWrapper#getProxy

  @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        //TODO 這里的invoker是MockClusterInvoker
        //TODO proxyFactory是JavassistProxyFactory
        T proxy = proxyFactory.getProxy(invoker);
       //TODO 泛化調用入口
        if (GenericService.class != invoker.getInterface()) {
                Class<?> stubClass = ReflectUtils.forName(stub);
                Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                proxy = (T) constructor.newInstance(new Object[]{proxy});
            }
        }
        return proxy;
    }

泛化調用主要用于消費端沒有API接口的情況;不需要引入接口JAR包,而是直接通過GenericService接口來發起服務調用,參數及返回值中的所有POJO均用Map表示。泛化調用對于服務端無需關注,按正常服務進行暴露即可。以下幾種場景可以考慮使用泛化調用:服務測試平臺、API服務網關

再來看下JavassistProxyFactory類

public class JavassistProxyFactory extends AbstractProxyFactory {

    //TODO DUbbo中的URL數據模型中指定有proxyFactory實現,默認是javassist
    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

可以看到getProxy得到的是一個InvokerInvocationHandler,所以在服務端調用RPC接口方法會調用到InvokerInvocationHandler#invoker,其中toString\hashCode\equals方法不走RPC調用。此時開頭提出的問題終于有了答案,reference#get得到的是InvokerInvocationHandler對象

  @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        //TODO 以下方法走本地調用
        if ("toString".equals(methodName) &amp;&amp; parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) &amp;&amp; parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) &amp;&amp; parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        //TODO 調用MockClusterInvoker#invoker
        return invoker.invoke(createInvocation(method, args)).recreate();
    }

MockClusterInvoker#invoker

  @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //TODO URL中的mock值,默認是不開啟
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //TODO 不走mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            //TODO 不管是否失敗都直接走mock
            result = doMockInvoke(invocation, null);
        } else {
            //TODO 失敗時調用mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }   

有了mock,再也不用擔心聯調慢和上游服務全部宕機

然后執行鏈會走到AbstractClusterInvoker#invoke

@Override
public Result invoke(final Invocation invocation) throws RpcException {
       //TODO 從directory獲取到invokerList
    List<Invoker<T>> invokers = list(invocation);
    //TODO 通過SPI擴展實例化LoadBalance,默認是隨機選取
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    //TODO FailoverClusterInvoker
    return doInvoke(invocation, invokers, loadbalance);
}

而list的執行鏈是RegistryDirectory#doList=>RouterChain#route,之前在分析完成注冊中心后訂閱變更事件,然后會刷新Directory中的RouterChain,這里也驗證了開頭描述的RegistryDirectory是動態維護的Invoker目錄服務

而doInvoke會將loadbalance實例傳到FailoverClusterInvoker#doInvoke

@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    //TODO 獲取invoker
    Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);

     //TODO 執行
     Result result = invoker.invoke(invocation);

 }

然后執行鏈是FailoverClusterInvoker#doInvoke=>InvokerWrapper#invoke=>ListenerInvokerWrapper#invoke=>ProtocolFilterWrapper#invoke=>AbstractInvoker#invoke=>DubboInvoker,其中在ProtocolFilterWrapper$1中會執行定義的各種過濾鏈filter

DubboInvoker#doInvoke,異步調用、回調調用,同時這也是與高性能NIO通信框架Netty交互的入口

  @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        ExchangeClient currentClient;//TODO 之前創建的連接
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
         if (isOneway) {//TODO 單向
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // TODO 將RpcInvocation放到Netty處理流程中
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {//TODO 異步
               //TODO 將RpcInvocation放到Netty處理流程中
                ResponseFuture future = currentClient.request(inv, timeout);
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);
                Result result;
                if (isAsyncFuture) {//TODO 回調

                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {

        } catch (RemotingException e) {

        }
    }

大概引用流程就是這樣,當然不同的配置執行鏈流程會有區別,但是大體方向是相似的,希望這篇文章能幫到你,歡迎關注

黑龙江6+1开奖结果查询