AsyncGenericService 可以忽略特定的服务,接受任意的客户端请求,并做相应处理发送对应的响应。当有特定服务监听请求时,客户端的请求会被送到指定的服务,如果客户端的请求没有特定的服务监听,则均会被送到 AsyncGenericService 。
异步服务的常规用法
class CallData
{
public:
CallData()
: _status(CREATE)
{
}
virtual void Proceed() = 0;
protected:
enum CallStatus { CREATE, PROCESS, READ, WRITE, FINISH };
CallStatus _status;
};
声明处理请求的基类,设置状态阶段码,后续通过状态码来执行不同阶段的操作。
class GreeterCallData : public CallData
{
public:
GreeterCallData(std::shared_ptr<Greeter::AsyncService> service,
std::shared_ptr<ServerCompletionQueue> cq)
: _service(service), _cq(cq), _responder(&_ctx)
{
Proceed();
}
void Proceed() override
{
if (_status == CREATE)
{
_status = PROCESS;
_service->RequestSayHello(&_ctx, &_request, &_responder, _cq.get(), _cq.get(), this);
}
else if (_status == PROCESS)
{
new GreeterCallData(_service, _cq);
std::string prefix("Hello ");
_reply.set_message(prefix + _request.name());
_status = FINISH;
_responder.Finish(_reply, Status::OK, this);
}
else
{
GPR_ASSERT(_status == FINISH);
delete this;
}
}
private:
ServerContext _ctx;
HelloRequest _request;
HelloReply _reply;
ServerAsyncResponseWriter<HelloReply> _responder;
std::shared_ptr<ServerCompletionQueue> _cq;
std::shared_ptr<Greeter::AsyncService> _service;
};
...
void HandleRpcs()
{
new GenericCallData(_genericService, _cq);
new GreeterCallData(_greeterService, _cq);
void* tag;
bool ok;
while (true)
{
GPR_ASSERT(_cq->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
解释:
- GreeterCallData 用于处理客户端请求,整个处理过程是异步的,通过轮询 ServerCompletionQueue 的通知,不断的调用
Proceed()
方法,并使用枚举状态标识 _status 来控制完成不同阶段的操作。 - GreeterCallData 创建时 _status 为
CREATE
状态,进入CREATE
分支。通过调用 Greeter::AsyncService 的RequestSayHello()
方法(这里的SayHello是通过 protobuf 声明的服务名),向 ServerCompletionQueue 注册一个监听事件,然后将 _status 改为PROCESS
状态。这里会将 ServerContext、HelloRequest、ServerAsyncResponseWriter 类型指针作为参数传入,用于存放调用的请求和上下文以及关联一个用于发送相应的 Writer 。随后的两个参数一个为 CompletionQueue 类型指针,用于处理请求,另一个为 ServerCompletionQueue 类型指针,用于通知新的请求,这两个参数可以是同一个指针。最后一个参数为void*
类型的 tag,当 ServerCompletionQueue 的Next
方法接收到新通知时,会被传递出来,以便后续的操作。 - 客户端调用服务时,进入
PROCESS
分支,首先新建一个 GreeterCallData 对象用于处理下一个请求。客户端的上下文和请求会存放到RequestSayHello()
方法传入的地址中,这时就可以取出并做相应操作。处理完需要响应的消息后,调用ServerAsyncResponseWriter 的Finish()
方法,注册结束事件,传入需要发送的响应和状态码,并将 _status 改为FINISH
。 - 当有可写通知时,进入
FINISH
分支。此时,响应和状态码已经发送出去,可以删除对象来释放空间,至此GreeterCallData 整个的生存周期结束。
AsyncGenericService 用法
class GenericCallData : public CallData
{
public:
GenericCallData(std::shared_ptr<grpc::AsyncGenericService> service, std::shared_ptr<ServerCompletionQueue> cq)
: _service(service)
, _cq(cq)
{
_rw = new grpc::GenericServerAsyncReaderWriter(&_ctx);
Proceed();
}
void Proceed() override
{
if (_status == CREATE)
{
_status = READ;
_service->RequestCall(&_ctx, _rw, _cq.get(), _cq.get(), this);
}
else if (_status == READ)
{
std::cout << "method: " << _ctx.method() << std::endl;
new GenericCallData(_service, _cq);
_rw->Read(&_request, this);
_status = WRITE;
}
else if (_status == WRITE)
{
Request req;
Response response;
ParseFromByteBuffer(&_request, &req);
std::cout << req.fill_username() << std::endl;
response.set_username("123456");
std::unique_ptr<grpc::ByteBuffer> reply = SerializeToByteBuffer(&response);
_rw->WriteAndFinish(*reply, grpc::WriteOptions(), grpc::Status::OK, this);
_status = FINISH;
}
else
{
GPR_ASSERT(_status == FINISH);
delete this;
}
}
private:
grpc::GenericServerContext _ctx;
grpc::ByteBuffer _request;
grpc::GenericServerAsyncReaderWriter *_rw;
std::shared_ptr<grpc::AsyncGenericService> _service;
std::shared_ptr<ServerCompletionQueue> _cq;
};
解释:
AsyncGenericService 的用法和特定的 AsyncService 用法逻辑相同,不同之处在于上下文和数据的类型。这里使用了 GenericCallData 类来处理通用请求。
- AsyncGenericService 注册监听事件的方法是
RequestCall()
, 前两个参数分别为 GenericServerContext 指针的上下文,GenericServerAsyncReaderWriter 指针的读写器。这里就不能传入存储请求的 request 参数,而是需要通过后续的方法来读取请求。后面的三个参数和特定的RequestXXX()
方法的类型含义均相同。 - 客户端调用服务时,首先任然新建 GenericCallData 对象用于监听下一个请求。然后需要调用 GenericServerAsyncReaderWriter 的
Read()
方法注册读请求事件,这里才传入了存储请求的 _request 指针,并且它是grpc::ByteBuffer
类型的指针。 - 当有读请求事件发生的通知时,反序列化 ByteBuffer 数据为相应的 Message 请求,就可以读取客户端发送的请求内容了。进行相应的操作后,调用 GenericServerAsyncReaderWriter 的
WriteAndFinish()
方法注册发送写和结束事件,传入需要发送的响应和状态码。 - 接收到写和结束事件发生的通知,生命周期结束,释放自己。
- 通过 GenericServerContext 的
method()
方法,可以获得该请求需要调用的服务名,服务端可以根据这个信息来将 ByteBuffer 反序列化为对应服务的请求。 - 这里使用了多态的特性,当声明多个服务时,通过 tag 传入自身的指针,然后通过父类指针调用 Proceed 函数,进行相对应的处理。
ByteBuffer 和 Message 的相互转换
//从 ByteBuffer 中解析 Message
bool ParseFromByteBuffer(grpc::ByteBuffer* buffer, grpc::protobuf::Message* message)
{
std::vector<grpc::Slice> slices;
(void)buffer->Dump(&slices);
grpc::string buf;
buf.reserve(buffer->Length());
for (auto s = slices.begin(); s != slices.end(); s++)
{
buf.append(reinterpret_cast<const char*>(s->begin()), s->size());
}
return message->ParseFromString(buf);
}
//将 Message 序列化为 ByteBuffer
std::unique_ptr<grpc::ByteBuffer> SerializeToByteBuffer(
grpc::protobuf::Message* message)
{
grpc::string buf;
message->SerializeToString(&buf);
grpc::Slice slice(buf);
return std::unique_ptr<grpc::ByteBuffer>(new grpc::ByteBuffer(&slice, 1));
}
客户端调用
客户端调用的方法和普通用法没有区别,可以同步也可以异步。