gRPC Client 启动流程

November 17, 2020 · 391 words · 2 min

gRPC Client 启动流程

gRPC 启动初始化的流程,使用 Netty 作为底层的实现

初始化 Channel

Channel 的初始化通过 ChannelBuilder 构建 这里通过 forTarget 设置了要解析的服务名称,会通过 NameResolver 解析,转换为具体的地址

ManagedChannel channel = ManagedChannelBuilder.forTarget("grpc-server")
                                              .usePlaintext()
                                              .build();
  • 构建 ManagedChannel 实例

io.grpc.internal.AbstractManagedChannelImplBuilder#build

调用 build 时,会根据 builder 中的属性,创建 ManagedChannelImpl 的实例

public ManagedChannel build() {
    return new ManagedChannelOrphanWrapper(new ManagedChannelImpl(
            this,
            // 构建 Transport 工厂
            buildTransportFactory(),
            new ExponentialBackoffPolicy.Provider(),
            // 线程池
            SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR),
            // 计时器
            GrpcUtil.STOPWATCH_SUPPLIER,
            // 统计和追踪拦截器
            getEffectiveInterceptors(),
            // 时间提供器
            TimeProvider.SYSTEM_TIME_PROVIDER));
}

初始化 Channel 属性

  • io.grpc.internal.ManagedChannelImpl#ManagedChannelImpl

ManagedChannel 设置属性,初始化服务发现,负载均衡,拦截器等并创建真正的 Channel

ManagedChannelImpl(AbstractManagedChannelImplBuilder<?> builder,
        ClientTransportFactory clientTransportFactory,
        BackoffPolicy.Provider backoffPolicyProvider,
        ObjectPool<? extends Executor> balancerRpcExecutorPool,
        Supplier<Stopwatch> stopwatchSupplier,
        List<ClientInterceptor> interceptors,
        final TimeProvider timeProvider)
设置服务名称
this.target = checkNotNull(builder.target, "target");
设置 TransportFactory

创建了支持鉴权的代理的 TransportFactory,用于支持向服务端发起请求进行鉴权

this.transportFactory = new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
构建服务发现工厂
this.nameResolverFactory = builder.getNameResolverFactory();
  • io.grpc.internal.AbstractManagedChannelImplBuilder#getNameResolverFactory

如果没有覆盖服务名称,则使用这个 nameResolverFactory,否则使用 OverrideAuthorityNameResolverFactory

NameResolver.Factory getNameResolverFactory() {
        if (authorityOverride == null) {
            return nameResolverFactory;
        } else {
            return new OverrideAuthorityNameResolverFactory(nameResolverFactory, authorityOverride);
        }
    }
构建服务发现实例
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
  • io.grpc.internal.ManagedChannelImpl#getNameResolver
static NameResolver getNameResolver(String target,
                                    NameResolver.Factory nameResolverFactory,
                                    NameResolver.Args nameResolverArgs) {
  URI targetUri = null;
  StringBuilder uriSyntaxErrors = new StringBuilder();
  // 解析地址
  targetUri = new URI(target);

  // 创建 NameResolver
  if (targetUri != null) {
    NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
    if (resolver != null) {
      return resolver;
    }
  }

  // 如果不是 URI 格式,则使用默认的 schema
  if (!URI_PATTERN.matcher(target).matches()) {
    targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);

    NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
    if (resolver != null) {
      return resolver;
    }
  }
  throw new IllegalArgumentException(String.format("cannot find a NameResolver for %s%s", target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
}
设置是否开启重试

根据是否主动开启了重试和禁止重试的开关决定是否要重试

this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
设置负载均衡

根据负载均衡策略构建 LoadBalancerFactory,会从注册器中获取并初始化负载均衡实例

this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
配置信息
// 配置解析器
ScParser serviceConfigParser = new ScParser(
        retryEnabled,
        builder.maxRetryAttempts,
        builder.maxHedgedAttempts,
        loadBalancerFactory,
        channelLogger);

// 服务配置拦截器
serviceConfigInterceptor = new ServiceConfigInterceptor(retryEnabled);

// 如果 builder 有配置,则解析配置
if (builder.defaultServiceConfig != null) {
  // 解析配置
  ConfigOrError parsedDefaultServiceConfig = serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
  this.defaultServiceConfig = (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
  this.lastServiceConfig = this.defaultServiceConfig;
} else {
  this.defaultServiceConfig = null;
}

this.lookUpServiceConfig = builder.lookUpServiceConfig;

// 如果没有开启则使用默认配置
if (!lookUpServiceConfig) {
  if (defaultServiceConfig != null) {
    channelLogger.log(ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
  }
  handleServiceConfigUpdate();
}
创建 RealChannel

RealChannel 用于发起请求

Channel channel = new RealChannel(nameResolver.getServiceAuthority());
  • io.grpc.internal.ManagedChannelImpl.RealChannel#newCall
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
                                                     CallOptions callOptions) {
  return new ClientCallImpl<>(method,
          // 执行的线程池
          getCallExecutor(callOptions),
          // 调用的参数
          callOptions,
          // Transport 提供器
          transportProvider,
          // 如果没有关闭,则获取用于调度的执行器
          terminated ? null : transportFactory.getScheduledExecutorService(),
          // 统计 Channel 调用信息
          channelCallTracer,
          // 是否重试
          retryEnabled)
          .setFullStreamDecompression(fullStreamDecompression)
          .setDecompressorRegistry(decompressorRegistry)
          .setCompressorRegistry(compressorRegistry);
}
拦截器
// 添加方法拦截器
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);

除此之外,还初始化了一些其他的 Channel 的配置和属性,当构建 Channel 完成后,就可以使用 Channel 构建 Stub,用于发起请求