松花皮蛋的黑板報
  • 分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

掃一掃
關注公眾號

微服務架構之服務框架Dubbo-服務暴露

博客首頁文章列表 松花皮蛋me 2019-05-13 22:19

上篇文章說到ServiceBean監聽了ContextRefreshedEvent然后export服務,我們接著談這個話題

private static final ScheduledExecutorService delayExportExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));

 public synchronized void export() {    
          checkAndUpdateSubConfigs();
        if (shouldDelay()) {
            delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

Dubbo提供了延遲暴露服務的特性,這個功能對初始化服務非常友好,建議開啟預熱。先看下ServiceConfig#checkAndUpdateSubConfigs,第一眼看到是check開頭以為只是校驗功能的

public void checkAndUpdateSubConfigs() {
    // Use default configs defined explicitly on global configs
    completeCompoundConfigs();
    // Config Center should always being started first.
    startConfigCenter();
    checkDefault();
    checkApplication();
    checkRegistry();
    checkProtocol();
    this.refresh();
    checkMetadataReport();
}

AbstractInterfaceConfig#startConfigCenter

   void startConfigCenter() {
        if (configCenter == null) {
            ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc);
        }

        if (this.configCenter != null) {
            // TODO there may have duplicate refresh
            this.configCenter.refresh();
            prepareEnvironment();
        }
        //刷新所有的配置,通過set方法注入相關屬性
        //getApplication().ifPresent(ApplicationConfig::refresh);
        //java8的Optional#ifPresent()如果實例非空則調用Comsumer Lambda 表達式
        ConfigManager.getInstance().refreshAll();
    }

AbstractInterfaceConfig#prepareEnvironment

private void prepareEnvironment() {
        if (configCenter.isValid()) {
            if (!configCenter.checkOrUpdateInited()) {
                return;
            }
            DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl());
            String configContent = dynamicConfiguration.getConfig(configCenter.getConfigFile(), 
            try {
                Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority());
                Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent));
                Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent));
            } catch (IOException e) {
                throw new IllegalStateException("Failed to parse configurations from Config Center.", e);
            }
        }
    }

AbstractInterfaceConfig#getDynamicConfiguration,Dubbo默認可以支持apollo、consul、etcd、zookeeper作為配置中心

  private DynamicConfiguration getDynamicConfiguration(URL url) {
        DynamicConfigurationFactory factories = ExtensionLoader
                .getExtensionLoader(DynamicConfigurationFactory.class)
                .getExtension(url.getProtocol());
        DynamicConfiguration configuration = factories.getDynamicConfiguration(url);
        Environment.getInstance().setDynamicConfiguration(configuration);
        return configuration;
    }    

ZookeeperDynamicConfigurationFactory#createDynamicConfiguration

 @Override
    protected DynamicConfiguration createDynamicConfiguration(URL url) {
        return new ZookeeperDynamicConfiguration(url, zookeeperTransporter);
    }

ZookeeperDynamicConfiguration#ZookeeperDynamicConfiguration,CountDownLatch是一個同步工具類,它允許線程一直等待,直到其他線程的操作執行完后再執行。另外一個常常被相提并論的CyclicBarrier,它讓一組線程到達一個屏障時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續干活

 ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
         //zookeeper://127.0.0.1:2181/ConfigCenterConfig?address=zookeeper://127.0.0.1:2181&check=true&configFile=.properties&group=&highestPriority=false&namespace=&prefix=.config-center&timeout=3000&valid=true
        this.url = url;
        rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
        initializedLatch = new CountDownLatch(1);
        this.cacheListener = new CacheListener(rootPath, initializedLatch);
        this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addDataListener(rootPath, cacheListener, executor);
        try {
            // Wait for connection
            this.initializedLatch.await();
        } catch (InterruptedException e) {
            logger.warn("Failed to build local cache for config center (zookeeper)." + url);
        }
    }

Dubbo的Zookeeper客戶端使用的是Curator,Curator是Netflix公司開源的一套Zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊Watcher和NodeExistsException異常等等

CuratorZookeeperClient#CuratorZookeeperClient

 public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))//重試策略
                    .connectionTimeoutMs(timeout);
            //授權認證
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            client = builder.build();
            //連接狀態變更通知
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

回到服務暴露入口

 private void doExportUrls() {
        //加載注冊中心列表
        List<URL> registryURLs = loadRegistries(true);
        //逐一向注冊中心分組暴露服務
        for (ProtocolConfig protocolConfig : protocols) {
            //org.apache.dubbo.demo.DemoService
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

ServiceConfig#doExportUrlsFor1Protocol ,如果不指定SCOPE會進行本地和遠程暴露服務

  private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

         //本地scope
          if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {

                exportLocal(url);
            }
           //遠程scope
          if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
          }
    }

本地暴露,使用proxyFactory#getInvoker創建服務代理生成切入點也就是invoker對象,然后protocol#export暴露服務

  private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                //injvm://127.0.0.1/....
            URL local = URLBuilder.from(url)
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST_VALUE)
                    .setPort(0)
                    .build();
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
        }
    }

ProxyFactory通過ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()獲得

如果擴展加載器getExtension找不到就創建(注:Dubbo使用了大量的雙重檢測機制),這里的injectExtension是通過set方法注入擴展加載器的依賴。注:原始JDK SPI不支持IOC注入,Dubbo SPI支持Set方法和構造方法注入,wrapperClass.getConstructor(type).newInstance(instance)

  @SuppressWarnings("unchecked")
    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }

Adaptive注解如果是配置在類級別上,表示自定義適配器,配置在方法級別上,表示動態生成適配器。SPI注解可以指定默認值。注:原始JDK SPI要用循環判斷對象,如果有一個實現加載異常會導致意想不到的結果,Dubbo自己設計的可以通過getAdaptiveExtension靈活獲取對象

private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
//并不一定需要動態生成
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

在getExtensionClasses方法中會判斷SPI的默認實現,加載指定路徑下的文件(resources/META-INF/包名+接口名/),方法里可能會設置cachedAdaptiveClass

 private T createAdaptiveExtension() {
        try {
           //獲取擴展點然后注入相應的依賴
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }

getAdaptiveExtensionClass方法是生成code然后編譯

private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        //取得該Class對象的類裝載器
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

code長下面這個樣子,重點關注下url.getParameter和getExtension。ProtocolFactory$Adaptive的code就不顯示了

package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements  org.apache.dubbo.rpc.ProxyFactory {

    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0, arg1);
    }
    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

默認類裝載器是org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory,另外一種是org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

客戶端調用(消費時)會用到getProxy方法獲取代理對象,其中InvokerInvocationHandler#invoker中會過濾方法,toString、hashCode、equals方法不走rpc調用,rpc調用走Invoker.invoke(createInvocation(method, args)).recreate()

由于Invoker對象實際和Service實現對象是無法直接調用的,需要先將ref實現類包裝成了一個Wrapper,利用字節碼增強技術Javassist為Wrapper創建一個實現類,然后Invoker被調用的時候會觸發doInvoke()方法,然后調用Wrapper的invokeMethod()方法

跳回到protocol#export的分析,它必須是冪等的,也就是暴露同一個URL的Invoker兩次,和暴露一次沒有區別,Invoker傳入后,根據Invoker.url自動獲得對應Protocol拓展實現。export()方法的調用順序是:Protocol$Adaptive => QosProtocolWrapper => ProtocolListenerWrapper => ProtocolFilterWrapper => InjvmProtocol

ProtocolFilterWrapper#export

 @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //注冊中心export
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        //構建執行鏈后export
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

ProtocolFilterWrapper#buildInvokerChain,默認的filters包括EchoFilter、ClassFilter、GenericFilter、ContextFilter、TraceFilter、TimeOutFilter、MonitorFilter、ExceptionFilter

 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    Result result = filter.invoke(next, invocation);
                    if (result instanceof AsyncRpcResult) {
                        AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                        asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                        return asyncResult;
                    } else {
                        return filter.onResponse(result, invoker, invocation);
                    }
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

我們選取限流Filter看下,其他如最小活躍連接數、緩存和異常Filter推薦自行閱讀

    @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    "Failed to invoke service " +
                            invoker.getInterface().getName() +
                            "." +
                            invocation.getMethodName() +
                            " because exceed max service tps.");
        }

        return invoker.invoke(invocation);
    }

}

StatItem#isAllowable,Dubbo使用的是AtomicInteger。首先根據時間偏差重置窗口,然后每次來一個請求就減少一個許可。業內常見的限流方式有計數器、滑動容器、漏斗、令牌

public boolean isAllowable() {
        long now = System.currentTimeMillis();
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        return flag;
    }

InjvmProtocol#export

  @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}

當scope為空時,會同時進行本地暴露和遠程暴露,遠程export流程中我們先看下ProtocolListenerWrapper#startQosServer。在Dubbo中,QoS這個概念被用于動態的對服務進行查詢和控制,就是后臺控制器角色,例如對獲取當前提供和消費的所有服務,以及對服務進行動態的上下線,即從注冊中心上進行注冊和反注冊操作

   private void startQosServer(URL url) {
        try {           

            //又看到了AtomicBoolean。在進行完比較和賦值操作之前,不會發生任何其他的操作,CAS鎖
            if (!hasStarted.compareAndSet(false, true)) {
                return;
            }
            int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
            boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
            Server server = Server.getInstance();
            server.setPort(port);
            server.setAcceptForeignIp(acceptForeignIp);
            server.start();

        } catch (Throwable throwable) {
            logger.warn("Fail to start qos server: ", throwable);
        }
    }

Server#start

   public void start() throws Throwable {
        if (!started.compareAndSet(false, true)) {
            return;
        }
        boss = new NioEventLoopGroup(1, new DefaultThreadFactory("qos-boss", true));
        worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
            }
        });
        try {
            serverBootstrap.bind(port).sync();
            logger.info("qos-server bind localhost:" + port);
        } catch (Throwable throwable) {
            logger.error("qos-server can not bind localhost:" + port, throwable);
            throw throwable;
        }
    }

回到遠程暴露export本身,RegistryProtocol#export

 public void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {

        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
         ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
            providerInvokerWrapper.setReg(true);
        }       
          // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);

    }

RegistryProtocol#doLocalExport

     private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
    });
}

DubboProtocol#export

 @Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    openServer(url);
    optimizeSerialization(url);

    return exporter;
}

private ExchangeServer createServer(URL url) {

      ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
}

上述Exchangers.bind(url, requestHandler)流程是
HeaderExchanger => Transporters$Adaptive =>
NettyTransporter => NettyServer。注:Exchanger、Transporter也是通過SPI擴展加載的

NettyServer#doOpen

 @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

接下來看下服務注冊,ZookeeperRegistry#doRegister

public class ZookeeperRegistry extends FailbackRegistry {

        public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }
         @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

ZookeeperTransporter

@SPI("curator")
public interface ZookeeperTransporter {
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);
}

服務提供者啟動后,向”/根/服務名/類型”目錄寫入自己的URL,比如/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo://172.23.205.2:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-annotation-provider&bean.name=providers:dubbo:org.apache.dubbo.demo.DemoService&default.deprecated=false&default.dynamic=false&default.register=true&deprecated=false&dubbo=2.0.2&dynamic=false&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5316&register=true&release=&side=provider&timestamp=1557727085669,生成一個EPHEMERAL臨時節點,這樣即使服務提供者異常關閉,等待Zookeeper會話超時后,該臨時節點也會自動刪除,當然Dubbo通過封裝JDK提供的ShutdownHook實現優雅停止,大概流程是標記不再新請求,已有請求處理完成后釋放資源

注冊完并不代表服務暴露的流程結束,還有一個很重要的過程需要處理,也就是監聽配置變更,在ZookeeperRegistry#doSubscribe中創建持久化節點“dubbo/org.apache.dubbo.demo.DemoService/configurators”,然后設置監聽回調地址,即回調給FailbackRegistry中的notify。如果有變更則更新本地Zookeeper信息緩存文件,然后判斷是否需要重新export

黑龙江6+1开奖结果查询