使用同步的非阻塞的服务端和客户端的请求处理流程

实现

IDL

  • helloworld.thrift
namespace java io.github.helloworlde.thrift

struct HelloMessage {
    1: required string message,
}

struct HelloResponse {
    1: required string message,
}

service HelloService {
    HelloResponse sayHello(1: HelloMessage request);
}

客户端实现

使用 TSocket 作为底层连接,协议使用 TBinaryProtocol

try {
    TTransport transport  = new TSocket("localhost", 9090);
    transport.open();
    TProtocol protocol = new TBinaryProtocol(transport);

    HelloService.Client client = new HelloService.Client(protocol);

    HelloMessage request = new HelloMessage();
    request.setMessage("Thrift");

    HelloResponse response = client.sayHello(request);
    log.info("返回响应: {}", response.getMessage());

} catch (TException e) {
    e.printStackTrace();
}

请求处理流程

1. 建立连接

transport.open();
  • org.apache.thrift.transport.TSocket#open

初始化 Socket,建立连接

public void open() throws TTransportException {
    if (socket_ == null) {
        // 初始化 Socket
        initSocket();
    }

    try {
        // 建立连接
        socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
        // 初始化流
        inputStream_ = new BufferedInputStream(socket_.getInputStream());
        outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
    } catch (IOException iox) {
        close();
        throw new TTransportException(TTransportException.NOT_OPEN, iox);
    }
}

2. 执行请求

使用 TProtocol 构建 TServiceClient,用于发送同步请求

  • io.github.helloworlde.thrift.HelloService.Client#sayHello
public HelloResponse sayHello(HelloMessage request) throws org.apache.thrift.TException {
    send_sayHello(request);
    return recv_sayHello();
}

发送请求

  • io.github.helloworlde.thrift.HelloService.Client#send_sayHello

其中的 sayHello_args 用于读写结构体,将消息内容转换为相应格式的字节

public void send_sayHello(HelloMessage request) throws org.apache.thrift.TException {
    sayHello_args args = new sayHello_args();
    args.setRequest(request);
    sendBase("sayHello", args);
}
  • org.apache.thrift.TServiceClient#sendBase(java.lang.String, org.apache.thrift.TBase)

这里设置了消息类型是调用

protected void sendBase(String methodName, TBase<?, ?> args) throws TException {
    sendBase(methodName, args, TMessageType.CALL);
}
  • org.apache.thrift.TServiceClient#sendBase(java.lang.String, org.apache.thrift.TBase, byte)

在写入请求,写入请求头,写入消息体,然后写入结尾符,将请求发送出去

private void sendBase(String methodName, TBase<?, ?> args, byte type) throws TException {
    // 构建请求,写入头信息
    oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_));
    // 写入协议对象
    args.write(oprot_);
    // 写入结尾信息
    oprot_.writeMessageEnd();
    // 清空缓冲,写入
    oprot_.getTransport().flush();
}
  • org.apache.thrift.protocol.TBinaryProtocol#writeMessageBegin

会将版本、调用类型、方法名、请求 ID 写入到请求头

public void writeMessageBegin(TMessage message) throws TException {
    if (strictWrite_) {
        // 写入版本
        int version = VERSION_1 | message.type;
        writeI32(version);
        // 被调用方法
        writeString(message.name);
        // 请求序号
        writeI32(message.seqid);
    } else {
        writeString(message.name);
        writeByte(message.type);
        writeI32(message.seqid);
    }
}
  • io.github.helloworlde.thrift.HelloService.sayHello_args.sayHello_argsStandardScheme#write

写入请求内容,会将结构体的相关描述信息写入到请求中

public void write(org.apache.thrift.protocol.TProtocol oprot, sayHello_args struct) throws org.apache.thrift.TException {
  struct.validate();

  oprot.writeStructBegin(STRUCT_DESC);
  if (struct.request != null) {
    oprot.writeFieldBegin(REQUEST_FIELD_DESC);
    struct.request.write(oprot);
    oprot.writeFieldEnd();
  }
  oprot.writeFieldStop();
  oprot.writeStructEnd();
}

接收响应

  • io.github.helloworlde.thrift.HelloService.Client#recv_sayHello

在处理请求时先构建了 sayHello_result对象,用于解析响应的描述

public HelloResponse recv_sayHello() throws org.apache.thrift.TException {
  sayHello_result result = new sayHello_result();
  receiveBase(result, "sayHello");
  if (result.isSetSuccess()) {
    return result.success;
  }
  throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result");
}
  • org.apache.thrift.TServiceClient#receiveBase

读取响应内容,解析为对象

protected void receiveBase(TBase<?, ?> result, String methodName) throws TException {
    // 读取消息
    TMessage msg = iprot_.readMessageBegin();
    // 如果是异常,则读取异常并抛出
    if (msg.type == TMessageType.EXCEPTION) {
        TApplicationException x = new TApplicationException();
        x.read(iprot_);
        iprot_.readMessageEnd();
        throw x;
    }
    // 如果请求序号不匹配,则抛出异常
    if (msg.seqid != seqid_) {
        throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID,
                String.format("%s failed: out of sequence response: expected %d but got %d", methodName, seqid_, msg.seqid));
    }
    // 读取响应内容
    result.read(iprot_);
    iprot_.readMessageEnd();
}
  • org.apache.thrift.protocol.TBinaryProtocol#readMessageBegin

读取并校验版本,获取方法名称、消息类型、请求 ID

public TMessage readMessageBegin() throws TException {
    int size = readI32();
    if (size < 0) {
        int version = size & VERSION_MASK;
        if (version != VERSION_1) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
        }
        return new TMessage(readString(), (byte) (size & 0x000000ff), readI32());
    } else {
        if (strictRead_) {
            throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
        }
        return new TMessage(readStringBody(size), readByte(), readI32());
    }
}
  • io.github.helloworlde.thrift.HelloService.sayHello_result.sayHello_resultStandardScheme#read

读取响应内容,解析为相应的对象,然后赋值给 result 对象

public void read(org.apache.thrift.protocol.TProtocol iprot, sayHello_result struct) throws org.apache.thrift.TException {
  org.apache.thrift.protocol.TField schemeField;
  iprot.readStructBegin();
  while (true) {
    schemeField = iprot.readFieldBegin();
    if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
      break;
    }
    switch (schemeField.id) {
      case 0: // SUCCESS
        if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
          struct.success = new HelloResponse();
          struct.success.read(iprot);
          struct.setSuccessIsSet(true);
        } else {
          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
        }
        break;
      default:
        org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
    }
    iprot.readFieldEnd();
  }
  iprot.readStructEnd();

  // check for required fields of primitive type, which can't be checked in the validate method
  struct.validate();
}

参考文档