gRPC 自定义健康检查

September 20, 2020 · 385 words · 2 min

gRPC 自定义健康检查

在 gRPC 中自定义健康检查逻辑,用于检查特定的组件(如检查 Redis、MQ 等),或者结合其他的服务组件一起使用(如使用 Spring Boot 的健康检查)

实现

gRPC 的健康检查服务是通过 health.proto定义的

  • health.proto
syntax = "proto3";

package grpc.health.v1;

option csharp_namespace = "Grpc.Health.V1";
option go_package = "google.golang.org/grpc/health/grpc_health_v1";
option java_multiple_files = true;
option java_outer_classname = "HealthProto";
option java_package = "io.grpc.health.v1";

message HealthCheckRequest {
  string service = 1;
}
message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;
  }
  ServingStatus status = 1;
}

service Health {
  // 单次健康检查
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);

  // 流式健康检查
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

里面定义了两个方法,一个是用于单次检查的 Check方法,一个是用于流式请求的 Watch方法

自定义检查组件

  • CustomHealthCheckImpl.java

自定义健康检查逻辑,通过不同的组件名称返回相应的状态信息

public class CustomHealthCheckImpl extends HealthGrpc.HealthImplBase {
    @Override
    public void check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
        System.out.println("健康检查:" + request.getService());

        HealthCheckResponse.ServingStatus servingStatus = getServingStatus(request);

        HealthCheckResponse response = HealthCheckResponse.newBuilder()
                                                          .setStatus(servingStatus)
                                                          .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    @Override
    public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
        System.out.println("健康检查 Stream:" + request.getService());

        HealthCheckResponse.ServingStatus servingStatus = getServingStatus(request);

        HealthCheckResponse response = HealthCheckResponse.newBuilder()
                                                          .setStatus(servingStatus)
                                                          .build();
        responseObserver.onNext(response);
    }

    private HealthCheckResponse.ServingStatus getServingStatus(HealthCheckRequest request) {
        HealthCheckResponse.ServingStatus servingStatus;
        String service = request.getService();

        switch (service) {
            case "":
                servingStatus = HealthCheckResponse.ServingStatus.SERVING;
                break;
            case "mysql":
                servingStatus = checkMySQL();
                break;
            case "redis":
                servingStatus = checkRedis();
                break;
            default:
                servingStatus = HealthCheckResponse.ServingStatus.UNKNOWN;
                break;
        }
        return servingStatus;
    }
}

使用

在 Server 端添加自定义的健康检查服务

Server server = ServerBuilder.forPort(1234)
                             .addService(new UserInfoServiceImpl()) 
                             .addService(new HelloServiceImpl())
                             .addService(new CustomHealthCheckImpl())
                             .build();

测试

grpc-health-probe -addr localhost:1234 -service mysql
status: SERVING

grpc-health-probe -addr localhost:1234 -service redis
service unhealthy (responded with "NOT_SERVING")

使用 Spring Boot 健康检查

通过 Spring Boot 的 HealthIndicator 作为 gRPC 的健康检查,需要将相应的组件状态转为 gRPC 的健康检查状态

public class CustomHealthCheckImpl extends HealthGrpc.HealthImplBase {

    private final ObjectProvider<HealthIndicator> healthIndicatorObjectProvider;

    public CustomHealthCheckImpl(ObjectProvider<HealthIndicator> healthIndicatorObjectProvider) {
        this.healthIndicatorObjectProvider = healthIndicatorObjectProvider;
    }

    @Override
    public void check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
        HealthCheckResponse.ServingStatus servingStatus = getServingStatus(request);

        HealthCheckResponse response = HealthCheckResponse.newBuilder()
                                                          .setStatus(servingStatus)
                                                          .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    @Override
    public void watch(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver) {
        HealthCheckResponse.ServingStatus servingStatus = getServingStatus(request);

        HealthCheckResponse response = HealthCheckResponse.newBuilder()
                                                          .setStatus(servingStatus)
                                                          .build();
        responseObserver.onNext(response);
    }

    private HealthCheckResponse.ServingStatus getServingStatus(HealthCheckRequest request) {
        HealthCheckResponse.ServingStatus servingStatus;
        String service = request.getService();

        servingStatus = healthIndicatorObjectProvider.stream()
                                                     .peek(h -> {
                                                         System.out.println(h.getClass().getSimpleName());
                                                     })
                                                     .filter(h -> h.getClass().getSimpleName().toLowerCase().contains(service.toLowerCase()))
                                                     .map(HealthIndicator::health)
                                                     .map(Health::getStatus)
                                                     .map(this::toGrpcHealthStatus)
                                                     .findFirst()
                                                     .orElse(HealthCheckResponse.ServingStatus.UNKNOWN);

        return servingStatus;
    }

    private HealthCheckResponse.ServingStatus toGrpcHealthStatus(Status status) {
        HealthCheckResponse.ServingStatus servingStatus;

        if (Status.UP.equals(status)) {
            servingStatus = HealthCheckResponse.ServingStatus.SERVING;
        } else if (Status.DOWN.equals(status)) {
            servingStatus = HealthCheckResponse.ServingStatus.NOT_SERVING;
        } else if (Status.OUT_OF_SERVICE.equals(status)) {
            servingStatus = HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN;
        } else {
            servingStatus = HealthCheckResponse.ServingStatus.UNKNOWN;
        }
        return servingStatus;
    }

}
  • 测试
grpc-health-probe -addr localhost:9090 -service disk
status: SERVING

grpc-health-probe -addr localhost:9090 -service redis
service unhealthy (responded with "UNKNOWN")