gRPC Server 端关闭流程

December 5, 2020 · 382 words · 2 min

gRPC Server 端关闭流程

关闭 Server

关闭 Server 可以使用 shutdown 或者 shutdownNow 方法

shutdown

server.shutdown().awaitTermination(10, TimeUnit.SECONDS);
  • io.grpc.internal.ServerImpl#shutdown

开始顺序的关闭 Server,已经存在的请求会继续执行,新的请求会被拒绝

public ServerImpl shutdown() {
    boolean shutdownTransportServers;
    synchronized (lock) {
        if (shutdown) {
            return this;
        }
        shutdown = true;
        shutdownTransportServers = started;
        if (!shutdownTransportServers) {
            transportServersTerminated = true;
            // 检查是否终止
            checkForTermination();
        }
    }
    if (shutdownTransportServers) {
        // 遍历所有的 Server 并关闭
        for (InternalServer ts : transportServers) {
            ts.shutdown();
        }
    }
    return this;
}

关闭时,首先会检查 Server 是否已经关闭了,如果已经关闭了,则抛出异常;如果没有关闭,则会修改关闭状态,返huan连接池,通知其他的锁; 然后会遍历所有的 Server,调用其 shutdown 方法进行关闭

  • io.grpc.netty.NettyServer#shutdown

关闭 NettySerer,添加关闭事件监听器,并等待关闭;在监听器中会释放资源,关闭协议协调器,关闭 Transport 等

public void shutdown() {
    // 如果 channel 已经关闭了,则返回
    if (channel == null || !channel.isOpen()) {
        // Already closed.
        return;
    }
    // 添加监听器,用于在关闭时释放资源,关闭协议,关闭 Transport 等
    channel.close().addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                log.log(Level.WARNING, "Error shutting down server", future.cause());
            }
            InternalInstrumented<SocketStats> stats = listenSocketStats;
            listenSocketStats = null;
            if (stats != null) {
                channelz.removeListenSocket(stats);
            }
            sharedResourceReferenceCounter.release();
            protocolNegotiator.close();
            synchronized (NettyServer.this) {
                // 关闭 Transport
                listener.serverShutdown();
            }
        }
    });
    try {
        // 关闭 channel
        channel.closeFuture().await();
    } catch (InterruptedException e) {
        log.log(Level.FINE, "Interrupted while shutting down", e);
        Thread.currentThread().interrupt();
    }
}
  • io.grpc.internal.ServerImpl.ServerListenerImpl#serverShutdown

监听 Server 关闭事件,根据关闭的状态,选择调用 Transportshutdown 或者 shutdownNow 关闭 ServerTransport

public void serverShutdown() {
    ArrayList<ServerTransport> copiedTransports;
    Status shutdownNowStatusCopy;
    // 复制 Transport 和状态
    synchronized (lock) {
        activeTransportServers--;
        if (activeTransportServers != 0) {
            return;
        }

        // transports collection can be modified during shutdown(), even if we hold the lock, due
        // to reentrancy.
        copiedTransports = new ArrayList<>(transports);
        shutdownNowStatusCopy = shutdownNowStatus;
        serverShutdownCallbackInvoked = true;
    }

    // 遍历 Transport,如果没有关闭状态,则调用shutdown 关闭,如果有状态,则调用 shutdownNow 立即关闭
    for (ServerTransport transport : copiedTransports) {
        if (shutdownNowStatusCopy == null) {
            transport.shutdown();
        } else {
            transport.shutdownNow(shutdownNowStatusCopy);
        }
    }

    synchronized (lock) {
        transportServersTerminated = true;
        // 是否终止的通知
        checkForTermination();
    }
}
  • io.grpc.netty.NettyServerTransport#shutdown

最终在 Transport 中调用了 Netty Channel 的关闭方法,进行关闭

@Override
public void shutdown() {
    if (channel.isOpen()) {
        channel.close();
    }
}

shutdownNow

立即关闭 Server,已经存在的请求和新的请求都会被拒绝;尽管是强制的,但是 Server 并不会瞬间关闭

server.shutdownNow();
  • io.grpc.internal.ServerImpl#shutdownNow

立即关闭时,会先调用 shutdown 方法执行正常的关闭流程,然后修改关闭状态;遍历所有的 ServerTransport,调用其 shutdownNow 方法进行关闭

public ServerImpl shutdownNow() {
    // 调用 shutdown 关闭 Transport
    shutdown();
    Collection<ServerTransport> transportsCopy;
    Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked");
    boolean savedServerShutdownCallbackInvoked;
    synchronized (lock) {
        if (shutdownNowStatus != null) {
            return this;
        }
        shutdownNowStatus = nowStatus;
        transportsCopy = new ArrayList<>(transports);
        savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked;
    }
    // 遍历 Transport 调用 shutdownNow
    if (savedServerShutdownCallbackInvoked) {
        //
        for (ServerTransport transport : transportsCopy) {
            transport.shutdownNow(nowStatus);
        }
    }
    return this;
}
  • io.grpc.netty.NettyServerTransport#shutdownNow

ServerTransportshutdownNow 会提交一个强制关闭的指令,并清空 channel,执行关闭

public void shutdownNow(Status reason) {
    if (channel.isOpen()) {
        channel.writeAndFlush(new ForcefulCloseCommand(reason));
    }
}