松花皮蛋的黑板報
  • 分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

掃一掃
關注公眾號

京東技術解密之配置中心DUCC

博客首頁文章列表 松花皮蛋me 2019-04-19 20:52

一、使用方法

簡單說下DUCC的特點

支持多環境(或稱分組),分組可以合并

內置強大的基于插件的數據綁定框架,支持多種類型等轉換;

支持Log4j、Log4j2、Logback的動態修改日記級別功能。

支持Spring原生注解、支持自定義注解,客戶端代碼入侵性低

支持客戶端多配置源,支持自定義配置,如ZK、Consol擴展

支持配置預案切換


接下來說說怎么用,下面我代碼中的ConfiguratorManager就是DUCC的配置管理類

@Configuration
@Log4j2
@Order(value = 0)
public class Config extends PropertyPlaceholderConfigurer {

    //發布系統的配置
    private static Map<String,String> joneProperty =  new HashMap<>();
    //DUCC系統的配置
    private static Map<String,String> duccProperty = new HashMap<>();

    @Bean(initMethod = "start" , destroyMethod = "stop")
    public  ConfiguratorManager configuratorManager()
    {
        ConfiguratorManager configuratorManager = ConfiguratorManager.getInstance() ;
        configuratorManager.setApplication(getJoneProperty("laf.config.manager.application"));
        configuratorManager.addResource(new Resource("ucc",getJoneProperty("laf.config.manager.uri")));
        configuratorManager.addListener(new ConfigurationListener.CustomConfigurationListener("ucc") {
            @Override
            public void onUpdate(com.jd.laf.config.Configuration configuration) {
                List<Property> properties = configuration.getProperties();
                for (Property property:properties) {
                    log.info("duccConfig update key:{}",property.getKey());
                    duccProperty.put(property.getKey(), String.valueOf(property.getValue()));
                }
            }
        });
        return configuratorManager ;
    }

    public String getDuccProperty(String key)
    {
        if(duccProperty.containsKey(key)) {
            return duccProperty.get(key);
        }
        Property property = configuratorManager().getProperty(key);
        if(property==null || String.valueOf(property.getValue()).isEmpty()) {
            log.error("配置降級,key:{}",key);
            return getJoneProperty(key);
        }
        duccProperty.put(key, String.valueOf(property.getValue()));
        return String.valueOf(property.getValue());
    }

    @Override
    protected void processProperties(ConfigurableListableBeanFactory beanFactory, Properties props)throws BeansException {
        super.processProperties(beanFactory, props);
        for (Object key : props.keySet()) {
            String keyStr = key.toString();
            joneProperty.put(keyStr, String.valueOf(props.getProperty(keyStr)));
        }
    }

    public  String getJoneProperty(String key)
    {
        return joneProperty.get(key);
    }

}

二、DUCC各重要模塊解讀

1、ConfiguratorManager統一配置管理類

主要步驟:

1.1 從服務器獲取配置信息

1.2 將配置保存到本地

1.3 啟動更新事件消費者

1.4 為每個Resource(可以理解為分組)啟動變更監控線程Watcher,監聽配置變更

public class ConfiguratorManager implements ConfigurationSupplier, Watchable {


      //通知
    protected Notifier notifier = new Notifier() {
        @Override
        public <M, T extends Listener> void send(M property, List<T> listeners) {
            inform(property, listeners);
        }
    };

    //事件
protected BlockingQueue<Resource> events = new ArrayBlockingQueue<Resource>(5000);


     @Override
    public Property getProperty(final String key) {
         //最終是通過configuration獲取的,后面會講到
        return configuration.getProperty(name);
    }

     @Override
    public boolean addListener(final PropertyListener listener) {

           //判斷是否重復添加

           groupResources.addListener(listener.getKey(), listener);
       }

         /**
     * 通知監聽器,notify執行send方法時會觸發
     *
     * @param property  變更的配置
     * @param listeners 監聽器
     */
    protected <M, T extends Listener> void inform(final M property, final List<T> listeners) {
        if (listeners == null) {
            return;
        }
        for (final T listener : listeners) {
            if (!isStarted()) {
                return;
            }
            notifierThreads.execute(new Runnable() {
                @Override
                public void run() {
                    if (isStarted()) {
                        listener.onUpdate(property);
                    }
                }
            });
        }
    }


        /**
     * 啟動
     */
    public void start() throws Exception {
         //驗證降級文件路徑可寫等
        validate();
        synchronized (mutex) {
            if (started.compareAndSet(false, true)) {
                ExecutorService executorService = null;
                try {
                //資源URL解析,排序合并
                initializeResource()
                resource.setConfigurator(configurator)
                //遠程加載,初始化上下文context
                startRemote(resources, executorService)         
                //通知,最后還是會執行到notifier.send()方法
                inform(groupResource, groupConfiguration);                   
                //啟動變更監聽
                consumer = new Thread(new EventConsumer());
                consumer.start();
                //所有資源已經初始化過,安全啟動監聽器
                //支持多個分組,推薦重復度高的單獨抽出來
                for (Resource resource : groupResources) {
                    if (resource.isReady()) {
                        resource.watch();
                    }
                }
               } catch (Exception e) {
                stop();
                throw e;
            } finally {
                if (executorService != null) {
                    executorService.shutdownNow();
                }
            }
            }
        }

     }


    protected class EventConsumer implements Runnable {
            @Override
            public void run() {
                while (isStarted() && !Thread.currentThread().isInterrupted()) {
                    try {
                        Resource event = events.poll(1000, TimeUnit.MICROSECONDS);
                        if (event != null) {
                            onUpdateConfig(event);
                        }
                    } catch (InterruptedException e) {
                       //必須重置中斷信號標記
                        Thread.currentThread().interrupt();
                    }
                }
        }

        /**
         * 資源-配置發生變更
         *
         * @param resource
         */
        protected void onUpdateConfig(final Resource resource) {
           //省略很多行
           /獲取舊的配置項
            //獲取新的配置項
            //判斷是否變更或者新增或者有刪除
            //通知配置監聽器,最后還是會執行到notifier.send()
            inform(newProperty, listeners);
        }
 }

2、監聽器容器Observer

Observer包含了各種Listener,同時擁有一個ConfiguratorManager的成員變量,Lister最終會傳遞到該變量中

/**
 * 監聽器容器
 */
public class Observer implements InitializingBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {

     protected ConfiguratorManager manager;

     /**
     *
     *   省略其他
     */
     public void addPropertyListener(final PropertyListener listener) {
        propertyListeners.add(listener);
    }

        /**
     * 添加監聽器
     */
    public synchronized void execute() {
        if (!processed) {
            if (manager != null) {
                for (PropertyListener listener : propertyListeners) {
                    manager.addListener(listener);
                }
                for (ConfigurationListener listener : configurationListeners) {
                    manager.addListener(listener);
                }
                for (Map.Entry<String, String> entry : scriptListeners.entrySet()) {
                    manager.addListener(new JavaScriptListener(entry.getKey(), entry.getValue(), context));
                }
                //針對@ConfigurationPropties注解
                for (ConfigPropertiesBean cfgBean : beans) {
                    String prefix = cfgBean.getPrefix();
                    Object bean = cfgBean.getBean();
                    if (bean == null && context.containsBean(cfgBean.getBeanName())) {
                        bean = context.getBean(cfgBean.getBeanName());
                    }
                    if (bean != null) {
                        //添加監聽器
                        process(bean, cfgBean.getBeanName(), prefix);
                    }
                }
            }
            processed = true;
        }
    }

    /**
    * 當BeanFactory設置完Bean屬性后會調用此方法,可以添加初始化方法
    ***/

      @Override
    public void afterPropertiesSet() throws Exception {

        //在BeanDefinition中定義注入的
        if (fieldListeners != null) {
            for (FieldListener fieldListener : fieldListeners) {
                addFieldListener(fieldListener);
            }
        }

        if (methodListeners != null) {
            for (MethodListener methodListener : methodListeners) {
                addMethodListener(methodListener);
            }
        }
    }

    /**
     * 獲取上下文的引用,需要實現ApplicationContextAware接口
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.context = applicationContext;
    }

    @Override
    public void onApplicationEvent(final ContextRefreshedEvent event) {
        if (event.getSource() == context) {
            execute();
        }
    }

}

如果在上下文中部署一個實現了ApplicationListener接口的Bean,那么每當在一個ApplicationEvent發布到ApplicationContext時,這個Bean會得到通知,其實這就是標準的Oberver設計模式

當ApplicationContext實例完成后,會調用onApplicationEvent()方法,執行execute()方法,然后將PropertyListener/ConfigurationLister添加到ConfiguratorManager實例中

3、Bean實例化后置處理器ConfigPostProcessor

/**
 * Bean實例化后置處理器,保存ConfiguratorManager實例,處理Bean的配置
 *
 */
public class ConfigPostProcessor implements BeanPostProcessor, PriorityOrdered, ApplicationContextAware {

    protected ApplicationContext applicationContext;

    protected Observer observer;

    /**
     * PostProcessBeforeInitialization 方法會在Bean構造完成后(構造方法執行完成),初始化方法(init-method)方法調用之前被調用
     * */
    @Override
        public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
    //處理配置對象
    if (!process(bean, beanName, observer)) {
        //處理屬性監聽器
        doWithFields(bean.getClass(), new FieldCallback() {
            @Override
            public void doWith(final Field field) {
                process(field, bean, beanName, observer);
            }
        }, new FieldFilter() {
            @Override
            public boolean matches(final Field field) {
                return !Modifier.isFinal(field.getModifiers()) && !Modifier.isStatic(field.getModifiers());
            }
        });

        //處理方法生成的配置對象
        doWithMethods(bean.getClass(), new MethodCallback() {
            @Override
            public void doWith(Method method) {
                process(method, bean, beanName, observer);
            }
        });
    }

    return bean;
        }

    /**
     * 處理方法
     *
     * @param method
     * @param bean
     * @param beanName
     */
    protected void process(final Method method, final Object bean, final String beanName, final Observer observer) {

    }

    /**
     * 處理字段
     *
     * @param field
     * @param bean
     * @param beanName
     */
    protected void process(final Field field, final Object bean, final String beanName, final Observer observer) {
        //SPI服務發現
        for (FieldProcessor processor : FIELD.extensions()) {
            processor.process(bean, beanName, field, observer);
        }
    }

    /**
     * 處理Bean
     *
     * @param bean
     * @param beanName
     * @return
     */
    protected boolean process(final Object bean, final String beanName, final Observer observer) {

    }

    /**
    *維持Observer的成員變量
    **/
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (bean instanceof ConfiguratorManager && observer.getManager() == null) {
            observer.setManager((ConfiguratorManager) bean);
        }
        if (bean instanceof ConfigurationListener) {
            observer.addConfigurationListener((ConfigurationListener) bean);
        }
        if (bean instanceof PropertyListener) {
            observer.addPropertyListener((PropertyListener) bean);
        }
        return bean;
    }


    @Override
    public int getOrder() {
         //優先級要放最低
        return Ordered.LOWEST_PRECEDENCE;
    }


    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        this.applicationContext = context;
        this.observer = context.getBean(Observer.class);
    }
}

我們可以看到ConfigPostProcessor加載通過SPI服務發現的方法\字段處理類,然后執行process方法,其內部封裝了Observer.addPropertyListener

4、Bean工廠后置處理器PropertySourcesFactorPostProcess

/**
 * 注冊PropertySource
 */
public class PropertySourcesFactoryPostProcessor implements     BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {

    protected ConfigurableEnvironment environment;

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = (ConfigurableEnvironment) environment;
    }

    @Override
    public int getOrder() {
        //最低優先級,先處理Spring內置的PropertySources
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public void postProcessBeanFactory(final ConfigurableListableBeanFactory factory) throws BeansException {
        //其它的PropertySource已經加載了,可以安全創建ConfiguratorManager
        ConfiguratorManager manager = factory.getBean(CONFIGURATOR_MANAGER, ConfiguratorManager.class);
        Observer observer = factory.getBean(OBSERVER_BEAN_NAME, Observer.class);
        //將manager設置為observer的成員變量
        if (observer.getManager() == null) {
            observer.setManager(manager);
        }
        // 注冊 Spring 屬性配置
        environment.getPropertySources().addFirst(new ConfigSource(manager));
        //Bean的定義已經注冊,由于ConfiguratorManager延遲加載,存在Bean定義中的占位符沒有被替換的情況。
        //用ConfiguratorManager替換一下剩余的占位符
        resolvePlaceHolder(factory, manager);
    }

    /**
     * 解析Bean的變量
     *
     * @param factory
     * @param manager
     */
    protected void resolvePlaceHolder(final ConfigurableListableBeanFactory factory, final ConfiguratorManager manager) {
        StringValueResolver valueResolver = new ConfigResolver(manager);
        BeanDefinitionVisitor visitor = new BeanDefinitionVisitor(valueResolver);

        String[] beanNames = factory.getBeanDefinitionNames();
        for (String beanName : beanNames) {
            if (!factory.containsSingleton(beanName)) {
                //Bean已經實例化為單例
                BeanDefinition bd = factory.getBeanDefinition(beanName);
                try {
                    visitor.visitBeanDefinition(bd);
                } catch (Exception ex) {
                    throw new BeanDefinitionStoreException(bd.getResourceDescription(), beanName, ex.getMessage(), ex);
                }
            }
        }

        // New in Spring 2.5: resolve placeholders in alias target names and aliases as well.
        factory.resolveAliases(valueResolver);
        // New in Spring 3.0: resolve placeholders in embedded values such as annotation attributes.
        factory.addEmbeddedValueResolver(valueResolver);
    }
}

可以看到在PostProcessBeanFactory方法中,實例化了ConfiguratorManager和Observer,并把Manager設置為Observer的成員變量。另外構造了一個包含Manager的配置屬性源propertrySources(屬性集合,內部封裝個多個k/v),并放到Sping屬性源的第一個

5、資源配置器

public abstract class AbstractConfigurator implements Configurator, Prototype {

    /**
     * 初始化
     *
     * @param context
     * @throws Exception
     */
    Configurator setup(Context context) throws Exception {
    }

    /**
     * 拉取配置
     *
     * @param version     當前版本
     * @param longPolling 長輪詢時間
     * @return
     * @throws Exception
     */
    Configuration pull(long version, int longPolling) throws Exception {
    }

    /**
     * 監聽配置變化
     *
     * @param version 當前版本
     * @return
     */
    boolean watch(long version) {
         //這里只判斷properties是否有更新,不涉及更新的具體類型
        if (!configuration.equals(resource.getConfiguration())) {
            resource.setConfiguration(configuration);
            //最終調用的是events.add(resource),也就是manager中的那個events
            context.fire(resource);
        }
    }

    /**
     * 停止監聽
     */
    void stop() {}

    /**
     * 資源來源
     *
     * @return 資源來源。
     */
    Source source() {}

    /**
     * 從URL中返回資源名稱
     *
     * @param url URL
     * @return 資源名稱
     */
    String name(URL url){}

}

Configuration主要是對properties進行操作,Resource封裝了Configuration。DUCC通過SPI服務發現將FileConfigurator、SystemConfigurator等extends了AbstractConfigurator的類自動加載進來從而達到可插撥擴展其他配置源的效果,也是通過這種機制支持所有數據格式、適配其他操作系統、實現方法字段屬性配置化

6、SPI服務發現

Java SPI 實際上是“基于接口的編程+策略模式+配置文件”組合實現的動態加載機制,常見的JDBC、SLF4門面就是通過這個實現的,下面是DUCC的運用

    /**
 * 插件管理器
 */
public interface Plugin {
    /**
     * 字段處理器擴展點
     */
    ExtensionPoint<FieldProcessor, String> FIELD = new ExtensionPointLazy<FieldProcessor, String>(FieldProcessor.class, SpiLoader.INSTANCE, null, null);
    /**
     * Bean處理器擴展點
     */
    ExtensionPoint<BeanProcessor, String> BEAN = new ExtensionPointLazy<BeanProcessor, String>(BeanProcessor.class, SpiLoader.INSTANCE, null, null);
    /**
     * 方法處理器擴展點
     */
    ExtensionPoint<MethodProcessor, String> METHOD = new ExtensionPointLazy<MethodProcessor, String>(MethodProcessor.class, SpiLoader.INSTANCE, null, null);
}



public class SpiLoader implements ExtensionLoader {
    public static final ExtensionLoader INSTANCE = new SpiLoader();

    public SpiLoader() {
    }

    public <T> Collection<Plugin<T>> load(Class<T> clazz) {
        if (clazz == null) {
            return null;
        } else {
            List<Plugin<T>> result = new LinkedList();
            ServiceLoader<T> plugins = ServiceLoader.load(clazz);
            Iterator var4 = plugins.iterator();

            while(var4.hasNext()) {
                //這里的plugin不是上面的那個plugin
                T plugin = var4.next();
                result.add(new Plugin(new Name(plugin.getClass()), plugin, this));
            }

            return result;
        }
    }
}

通過SPI機制將接口的實現類全部加載并實例化一遍,前提是實現類名稱放在”META-INF/services/接口名”。當在ConfigPostProcessor中執行METHOD.extensions()時會將實現了MethodProcessor接口的實例取出來

三、SpingBoot注解(@EnableLafConfig)流程總結

DUCC通過實現ImportBeanDefinitionRegistrar接口,將指定的類注冊到Spring Boot容器中,另外必須定義一個Java配置類(帶有注解@Configuration)通過@Import指定ImportBeanDefinitionRegistrar的實現類。在registerBeanDefinitions()完成了以下幾個類的BeanDefinition的注冊:

  1. 1、配置管理器ConfiguratorManager,監聽器容器Observer
  2. 2、Bean實例化后置處理器ConfigPostProcessor
  3. 3、Bean工廠后置處理器PropertySourcesFactoryPostProcessor

四、Spring初始化流程

主要流程在AbstractApplicationContext.refresh()中

流程圖備注

  1. 1、InvokeBeanFactoryPostProcessors()

    在Bean未開始實例化時,執行工廠后置處理器,會查找所有BeanFactoryPostProcessor實現類Bean,并且調用方法PostProcessBeanDefinitionRegistry,修改Definition的定義

    注意:DUCC中PropertySourcesFactoryPostProcessor實現了BeanFactoryPostProcessor把ConfiguratorManager賦值給Observer
  2. 2、RegisterBeanPostProcessors()

    將BeanPostProcessors的實現類添加到工廠的RegisterBeanPostProcessors中

五、Bean實例生命周期

黑龙江6+1开奖结果查询