加载配置

加载配置是通过 ConsulPropertySourceLocator 来实现的,该类是 PropertySourceLocator接口的实现类

Bean 初始化

		@Bean
		public ConsulPropertySourceLocator consulPropertySourceLocator(ConsulConfigProperties consulConfigProperties) {
			return new ConsulPropertySourceLocator(this.consul, consulConfigProperties);
		}

获取配置

获取配置是通过 PropertySourceLocator#locate 方法实现的,最终将获取到属性添加到环境中

  • ConsulPropertySourceLocator#locate

获取配置时,根据应用名称,路径,环境及配置类型拼接相应的路径,然后调用 Consul 获取 KV 值的接口,获取相应的配置,根据类型解析后放入环境中

	@Override
	@Retryable(interceptor = "consulRetryInterceptor")
	public PropertySource<?> locate(Environment environment) {
		if (environment instanceof ConfigurableEnvironment) {
			ConfigurableEnvironment env = (ConfigurableEnvironment) environment;

			String appName = this.properties.getName();

			if (appName == null) {
				appName = env.getProperty("spring.application.name");
			}

			List<String> profiles = Arrays.asList(env.getActiveProfiles());

			String prefix = this.properties.getPrefix();

			List<String> suffixes = new ArrayList<>();
			// 不是文件类型的时候,后缀为 /,否则就是配置文件的后缀
			if (this.properties.getFormat() != FILES) {
				suffixes.add("/");
			} else {
				suffixes.add(".yml");
				suffixes.add(".yaml");
				suffixes.add(".properties");
			}

			// 路径
			String defaultContext = getContext(prefix, this.properties.getDefaultContext());

			for (String suffix : suffixes) {
				this.contexts.add(defaultContext + suffix);
			}
			// 追加环境及文件类型
			for (String suffix : suffixes) {
				addProfiles(this.contexts, defaultContext, profiles, suffix);
			}

			String baseContext = getContext(prefix, appName);

			// 应用名称前缀
			for (String suffix : suffixes) {
				this.contexts.add(baseContext + suffix);
			}
			for (String suffix : suffixes) {
				addProfiles(this.contexts, baseContext, profiles, suffix);
			}

			Collections.reverse(this.contexts);

			CompositePropertySource composite = new CompositePropertySource("consul");

			for (String propertySourceContext : this.contexts) {
				try {
					ConsulPropertySource propertySource = null;
					if (this.properties.getFormat() == FILES) {
						// 获取值
						Response<GetValue> response = this.consul.getKVValue(propertySourceContext, this.properties.getAclToken());
						// 添加当前索引
						addIndex(propertySourceContext, response.getConsulIndex());
						// 如果值不为空,则更新值并初始化
						if (response.getValue() != null) {
							ConsulFilesPropertySource filesPropertySource = new ConsulFilesPropertySource(propertySourceContext, this.consul, this.properties);
							// 解析配置内容
							filesPropertySource.init(response.getValue());
							propertySource = filesPropertySource;
						}
					} else {
						propertySource = create(propertySourceContext, this.contextIndex);
					}
					if (propertySource != null) {
						composite.addPropertySource(propertySource);
					}
				} catch (Exception e) {
					if (this.properties.isFailFast()) {
						log.error("Fail fast is set and there was an error reading configuration from consul.");
						ReflectionUtils.rethrowRuntimeException(e);
					} else {
						log.warn("Unable to load consul config from " + propertySourceContext, e);
					}
				}
			}

			return composite;
		}
		return null;
	}

监听配置

Consul 监听配置是通过定时任务实现的,

Bean 初始化

Bean 的初始化是在 org.springframework.cloud.consul.config.ConsulConfigAutoConfiguration 中实现的

		@Bean
		@ConditionalOnProperty(name = "spring.cloud.consul.config.watch.enabled", matchIfMissing = true)
		public ConfigWatch configWatch(ConsulConfigProperties properties,
		                               ConsulPropertySourceLocator locator,
		                               ConsulClient consul,
		                               @Qualifier(CONFIG_WATCH_TASK_SCHEDULER_NAME) TaskScheduler taskScheduler) {
			return new ConfigWatch(properties, consul, locator.getContextIndexes(), taskScheduler);
		}

监听实现

ConfigWatch 类实现了 ApplicationEventPublisherAwareSmartLifecycle 接口,

  • 启动

当应用启动后,会调用 SmartLifecycle 的 start 方法,然后初始化配置监听,通过向线程池添加一个定时任务,实现配置的定时拉取,定时任务默认周期是 1s

	@Override
	public void start() {
		if (this.running.compareAndSet(false, true)) {
			this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
				this::watchConfigKeyValues, this.properties.getWatch().getDelay());
		}
	}
  • 监听

监听时会遍历所有的key,根据 key 从 Consul 获取相应的数据,判断 Index 是否发生变化,如果发生变化,则发送 RefreshEvent 事件,需要手动实现事件监听以响应配置bai

	// Timed 是 Prometheus 的监控
	@Timed("consul.watch-config-keys")
	public void watchConfigKeyValues() {
		if (this.running.get()) {
			// 遍历所有的配置的 key
			for (String context : this.consulIndexes.keySet()) {

				// turn the context into a Consul folder path (unless our config format
				// are FILES)
				if (this.properties.getFormat() != FILES && !context.endsWith("/")) {
					context = context + "/";
				}

				// 根据配置返回的 index 判断是否发生变化
				try {
					Long currentIndex = this.consulIndexes.get(context);
					if (currentIndex == null) {
						currentIndex = -1L;
					}

					log.trace("watching consul for context '" + context + "' with index " + currentIndex);

					// use the consul ACL token if found
					String aclToken = this.properties.getAclToken();
					if (StringUtils.isEmpty(aclToken)) {
						aclToken = null;
					}

					// 获取指定的 key
					Response<List<GetValue>> response = this.consul.getKVValues(context, aclToken, new QueryParams(this.properties.getWatch().getWaitTime(), currentIndex));

					// if response.value == null, response was a 404, otherwise it was a
					// 200
					// reducing churn if there wasn't anything
					if (response.getValue() != null && !response.getValue().isEmpty()) {
						Long newIndex = response.getConsulIndex();

						// 判断 key 的 index 是否相等,如果发生变化,则发出 RefreshEvent 事件
						if (newIndex != null && !newIndex.equals(currentIndex)) {
							// don't publish the same index again, don't publish the first
							// time (-1) so index can be primed
							// 没有发布过这个 index 的事件,且不是第一次发布
							if (!this.consulIndexes.containsValue(newIndex) && !currentIndex.equals(-1L)) {
								log.trace("Context " + context + " has new index " + newIndex);
								// 发送事件
								RefreshEventData data = new RefreshEventData(context, currentIndex, newIndex);
								this.publisher.publishEvent(new RefreshEvent(this, data, data.toString()));
							} else if (log.isTraceEnabled()) {
								log.trace("Event for index already published for context " + context);
							}
							this.consulIndexes.put(context, newIndex);
						} else if (log.isTraceEnabled()) {
							log.trace("Same index for context " + context);
						}
					} else if (log.isTraceEnabled()) {
						log.trace("No value for context " + context);
					}

				} catch (Exception e) {
					// only fail fast on the initial query, otherwise just log the error
					if (this.firstTime && this.properties.isFailFast()) {
						log.error("Fail fast is set and there was an error reading configuration from consul.");
						ReflectionUtils.rethrowRuntimeException(e);
					} else if (log.isTraceEnabled()) {
						log.trace("Error querying consul Key/Values for context '" + context + "'", e);
					} else if (log.isWarnEnabled()) {
						// simplified one line log message in the event of an agent
						// failure
						log.warn("Error querying consul Key/Values for context '" + context + "'. Message: " + e.getMessage());
					}
				}
			}
		}
		this.firstTime = false;
	}