当前位置:网站首页>Websocket for data synchronization

Websocket for data synchronization

2021-01-23 23:16:48 zhurd

The goal is

  • soul websocket Mode data synchronization principle and source code analysis

The reason for data synchronization

Why do you want to do data synchronization ?

The gateway is the gateway to the traffic request , It plays a very important role in the microservice architecture . In the process of using the gateway , To meet business demands , Configuration changes are often required , Like flow control rules 、 Routing rules and so on . therefore , Gateway dynamic configuration is an important factor to ensure high availability of gateway .

Soul All of the plug-ins are hot pluggable , And the selectors for all plug-ins 、 Rules are all dynamic configuration , Immediate effect , No need to restart the service . In order to provide higher response speed ,soul All configurations are cached in JVM Of Hashmap in , Every request goes to the local cache , Very fast . In order to modify the configuration can be timely updated to JVM In the local cache of , So data synchronization is needed .

Soul Three data synchronization modes are supported :

  1. zookeeper Sync
  2. websocket Sync
  3. http Long polling

Principle analysis

Soul Official documents Introduction to data synchronization :

Soul When the gateway starts , The configuration data is synchronized from the configuration service , It supports push-pull mode to obtain configuration change information , And update the local cache . And the administrator is in the management background , Change users 、 The rules 、 plug-in unit 、 Traffic configuration , Synchronize the change information to Soul gateway , The concrete is push Pattern , still pull The mode depends on the configuration .

Synchronization flowchart :

image-20210117194425

This article chooses websocket Synchronous mode analysis

websocket Sync

What is? websocket?

WebSocket yes HTML5 New agreement , Its purpose is to establish an unlimited two-way communication channel between browser and server , for instance , The server can send messages to the browser at any time .

Soul gateway websocket Synchronization principle :

Soul Gateway and admin Set up websocket When the connection ,admin Will push a full amount of data , Later, if the configuration data changes , Then the incremental data is passed through websocket Actively push to soul-web.

Soul Gateway on websocket Sync :

  • soul-bootstrap Add the following dependencies :

    <!--soul data sync start use websocket-->
    <dependency>
        <groupId>org.dromara</groupId>
        <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
        <version>2.2.1</version>
    </dependency>
  • application.yml Add related configuration

    soul :
        sync:
            websocket :
                 urls: ws://localhost:9095/websocket

soul-admin To configure , Or in the soul-admin Set... In startup parameters --soul.sync.websocket='', Then restart the service

soul:
  sync:
    websocket:
      enabled: true

Source code analysis

soul-admin Data synchronization

soul-admin After the user changes his data , Will pass spring Of ApplicationEventPublisher Issue data change notification , from DataChangedEventDispatcher Process the change notice , And then according to the configuration weboscket Synchronization strategies , Send the configuration to the corresponding event handler .

  • Data change notification source code analysis

If we were soul-admin After the background management has done the creation and update of the configuration , Will trigger publishEvent event

private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditionDTOs) {
        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
        List<ConditionData> conditionDataList =
                selectorConditionDTOs.stream().map(ConditionTransfer.INSTANCE::mapToSelectorDTO).collect(Collectors.toList());
        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
                Collections.singletonList(SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList))));
    }

publishEvent Event method , adopt DataChangedEvent Medium groupKey To handle events related to different components .

DataChangedEvent The specific implementation class of is DataChangedEventDispatcher

@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

    private ApplicationContext applicationContext;

    private List<DataChangedListener> listeners;

    public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    /**
     *  Data change event distribution 
     */
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            //  Listeners that handle different components 
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }

    @Override
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }

}
  • websocket Listener source code analysis

We've opened it in front of us soul.sync.websocket.enabled=true, Then there must be a place in the project to read the configuration . adopt `soul.sync.websocket.enabled Search for configuration classes for data synchronization DataSyncConfiguration, Here is websocket Configuration code for :

    @Configuration
    @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
    @EnableConfigurationProperties(WebsocketSyncProperties.class)
    static class WebsocketListener {

        /**
         * Config event listener data changed listener.
         *
         * @return the data changed listener
         */
        @Bean
        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)
        public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener();}

        /**
         * Websocket collector websocket collector.
         *
         * @return the websocket collector
         */
        @Bean
        @ConditionalOnMissingBean(WebsocketCollector.class)
        public WebsocketCollector websocketCollector() {return new WebsocketCollector();}

        /**
         * Server endpoint exporter server endpoint exporter.
         *
         * @return the server endpoint exporter
         */
        @Bean
        @ConditionalOnMissingBean(ServerEndpointExporter.class)
        public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }
    }

WebsocketDataChangedListener Class is DataChangedListener The concrete implementation of the interface , adopt WebsocketCollector Send data change information

public class WebsocketDataChangedListener implements DataChangedListener {
    // Plug in change monitoring 
    @Override
    public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
        WebsocketData<PluginData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }
   // Selector change monitoring 
    @Override
    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
        WebsocketData<SelectorData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }
    // Rule change monitoring 
    @Override
    public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
        WebsocketData<RuleData> configData =
                new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }
    //App Authentication monitoring 
    @Override
    public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {
        WebsocketData<AppAuthData> configData =
                new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }

    // Metadata change monitoring 
    @Override
    public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
        WebsocketData<MetaData> configData =
                new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
        WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
    }

}

thus ,soul-admin The data transmission has been completed .

soul-bootstrap Gateway data synchronization

Turn on websocker Sync , Need to be in soul-bootstrap Introduction in soul-spring-boot-starter-sync-data-websocket, Find the corresponding customization in the project spring-boot-starter, Found out WebsocketSyncDataConfiguration Configuration class .

@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {

    /**
     * Websocket sync data service.
     * Websocket  Data synchronization implementation class 
     * @param websocketConfig   the websocket config
     * @param pluginSubscriber the plugin subscriber
     * @param metaSubscribers   the meta subscribers
     * @param authSubscribers   the auth subscribers
     * @return the sync data service
     */
    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

    /**
     * Config websocket config.
     * yml In the document websocket To configure 
     *
     * @return the websocket config
     */
    @Bean
    @ConfigurationProperties(prefix = "soul.sync.websocket")
    public WebsocketConfig websocketConfig() {
        return new WebsocketConfig();
    }
}

Use websocket When syncing , Pay special attention to disconnection and reconnection , It's also called keeping your heart beating .soul Use java-websocket This is a third-party library websocket Connect .WebsocketSyncDataService Class :

public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
       // from websocketConfig In order to get url Information ( yml Configured in soul.sync.websocket.urls: ws://localhost:9095/websocket)
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
       //  establish SoulWebsocketClient
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
        for (String url : urls) {
            try {
                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                log.error("websocket url({}) is error", url, e);
            }
        }
        try {
            for (WebSocketClient client : clients) {
                 // Connect 
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                // Use the scheduling thread pool for disconnection reconnection ,30 One second 
                executor.scheduleAtFixedRate(() -> {
                    if (client.isClosed()) {
                        boolean reconnectSuccess = client.reconnectBlocking();
                    }
                }, 10, 30, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            log.info("websocket connection...exception....", e);
        }
    }

SoulWebsocketClient Class is instantiated in the constructor WebsocketDataHandler, adopt ConfigGroupEnum type , Choose specific DataHandler

    /**
     * Instantiates a new Soul websocket client.
     *
     * @param serverUri             the server uri
     * @param pluginDataSubscriber the plugin data subscriber
     * @param metaDataSubscribers   the meta data subscribers
     * @param authDataSubscribers   the auth data subscribers
     */
    public SoulWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
                               final List<MetaDataSubscriber> metaDataSubscribers
                               , final List<AuthDataSubscriber> authDataSubscribers) {
        super(serverUri);
       
        this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber
                                                             , metaDataSubscribers, authDataSubscribers);
    }
public class WebsocketDataHandler {
    private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
    /**
     * Instantiates a new Websocket data handler.
     *
     * @param pluginDataSubscriber the plugin data subscriber
     * @param metaDataSubscribers  the meta data subscribers
     * @param authDataSubscribers  the auth data subscribers
     */
    public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
                                final List<MetaDataSubscriber> metaDataSubscribers,
                                final List<AuthDataSubscriber> authDataSubscribers) {
        ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
        ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
        ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
    }
    /**
     * Executor.
     *
     * @param type      the type
     * @param json      the json
     * @param eventType the event type
     */
    public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
        ENUM_MAP.get(type).handle(json, eventType);
    }
}

Datahandler Using the template method design pattern ,AbstractDataHandler Define the handling methods of different events

public void handle(final String json, final String eventType) {
        List<T> dataList = convert(json);
        if (CollectionUtils.isNotEmpty(dataList)) {
            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
            switch (eventTypeEnum) {
                case REFRESH:
                case MYSELF:
                    doRefresh(dataList);
                    break;
                case UPDATE:
                case CREATE:
                    doUpdate(dataList);
                    break;
                case DELETE:
                    doDelete(dataList);
                    break;
                default:
                    break;
            }
        }
    }

Enter any event handling method , Such as into the SelectorDataHandler Of doUpdate Method , Discover the cache update interface PluginDataSubscriber

@Override
protected void doUpdate(final List<SelectorData> dataList) {
    dataList.forEach(pluginDataSubscriber::onSelectorSubscribe);
}

Keep going , Found out SelectorDataHandler Implementation class of interface CommonPluginDataSubscriber, Here's how to change the data in the cache :

private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
    Optional.ofNullable(classData).ifPresent(data -> {
        // plug-in unit 
        if (data instanceof PluginData) {
            PluginData pluginData = (PluginData) data;
            // to update 
            if (dataType == DataEventTypeEnum.UPDATE) {
                BaseDataCache.getInstance().cachePluginData(pluginData);
                Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                // Delete 
                BaseDataCache.getInstance().removePluginData(pluginData);
                Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
            }
        } else if (data instanceof SelectorData) {
            // Selectors 
            SelectorData selectorData = (SelectorData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                BaseDataCache.getInstance().cacheSelectData(selectorData);
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                BaseDataCache.getInstance().removeSelectData(selectorData);
                Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
            }
        } else if (data instanceof RuleData) {
            // The rules 
            RuleData ruleData = (RuleData) data;
            if (dataType == DataEventTypeEnum.UPDATE) {
                BaseDataCache.getInstance().cacheRuleData(ruleData);
                Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
            } else if (dataType == DataEventTypeEnum.DELETE) {
                BaseDataCache.getInstance().removeRuleData(ruleData);
                Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
            }
        }
    });
}

thus ,websocket Data synchronization source code analysis completed .

版权声明
本文为[zhurd]所创,转载请带上原文链接,感谢
https://chowdera.com/2021/01/20210123231300144x.html

随机推荐