当前位置: 首页 > 知识库问答 >
问题:

我如何使用ZeroMQ为协议缓冲区编写自己的RPC实现

羊舌自强
2023-03-14

他们说,根据谷歌协议缓冲区文档中“定义服务”下的内容,

也可以将协议缓冲区与您自己的RPC实现一起使用。

据我了解,协议缓冲区并没有原生实现RPC。相反,它们提供了一系列必须由用户实现的抽象接口(就是我!)。所以我想利用ZeroMQ实现这些抽象接口进行网络通信

我正在尝试使用ZeroMQ创建一个RPC实现,因为我正在进行的项目已经实现了基本消息传递的ZeroMQ(因此为什么我不使用gRPC,正如留档建议的那样)。

在彻底阅读了proto文档之后,我发现我必须为自己的实现实现抽象接口RpcChannel和RpcController。

我已经用我的RPC实现构建了一个我目前所处位置的最小化示例

. proto文件:为简洁起见省略了SearchRequest和SearchACK模式

service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}

SearchServiceImpl。h:

class SearchServiceImpl : public SearchService {
 public:
  void Search(google::protobuf::RpcController *controller,
                    const SearchRequest *request,
                    SearchResponse *response,
                    google::protobuf::Closure *done) override {
    // Static function that processes the request and gets the result
    SearchResponse res = GetSearchResult(request);

    // Call the callback function
    if (done != NULL) {
    done->Run();
    }
    }
  }
};

MyRPCController。h:

class MyRPCController : public google::protobuf::RpcController {
 public:
    MyRPCController();

    void Reset() override;

    bool Failed() const override;

    std::string ErrorText() const override;

    void StartCancel() override;

    void SetFailed(const std::string &reason) override;

    bool IsCanceled() const override;

    void NotifyOnCancel(google::protobuf::Closure *callback) override;
 private:
  bool failed_;
  std::string message_;
};

MyRPCController。cpp-基于此

void MyRPCController::Reset() {  failed_ = false; }

bool MyRPCController::Failed() const { return failed_; }

std::string MyRPCController::ErrorText() const { return message_; }

void MyRPCController::StartCancel() { }

void MyRPCController::SetFailed(const std::string &reason) {
  failed_ = true;
  message_ = reason;
}

bool MyRPCController::IsCanceled() const { return false; }

void MyRPCController::NotifyOnCancel(google::protobuf::Closure *callback) { }

MyRPCController::ChiRpcController() : RpcController() { Reset(); }

MyRpcChannel。h:

class MyRPCChannel: public google::protobuf::RpcChannel {
 public:
    void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller,
                    const google::protobuf::Message *request, google::protobuf::Message *response,
                    google::protobuf::Closure *done) override;
};

到目前为止,我对我的例子有以下问题:

  • 我在哪里将ZeroMQ放入这个?
    • 它似乎应该进入RPCChannel,因为在我看到的示例中(请参阅此处的第3个代码块),它们传递一个具有要绑定到的端口的字符串(即MyRpcChannel通道("rpc: host name: 1234/myservice");

    以下是我遇到的其他一些Stack Overflow问题,其中包含有关该主题的一些有用信息:

    • 原型网:实现服务器、rpc控制器和rpc通道-这是我找到RPCController实现示例的地方。
    • 使用协议缓冲区在ZeroMQ中实现RPC-这个答案很有趣,因为在最上面的答案中,他们似乎建议不要为. proto文件使用内置RPC格式的原型。
      • 我在这个文件中也注意到了同样的概念,在一个名为libpbrpc的存储库中,它似乎是一个很好的示例代码源代码

      谢谢你的帮助。我希望我提供了足够的信息,并且清楚地知道我在寻找什么。如果有什么不清楚或缺少信息,请告诉我。我很乐意相应地编辑这个问题。

共有1个答案

胡璞瑜
2023-03-14
  • ZeroMQ基于可以包含任何数据的消息为网络通信提供了一个低级API

ZeroMQ和gRPC都提供对网络通信的支持,但方式不同。您必须选择ZeroMQ或gRPC进行网络通信。如果您选择ZeroMQ,则可以使用交换二进制结构化数据的原型缓冲区对消息进行编码。

主要的一点是原型缓冲区库允许对变体记录(类似于C/C联合)进行编码和解码,从而可以完全模拟具有交换原型缓冲区消息功能的RPC服务提供的功能。

因此,选项包括:

  1. 将ZeroMQ与发送和接收原语和原型缓冲区编码的变体消息一起使用,这些消息可以包含各种子消息,例如
union Request
{
  byte msgType;
  MessageType1 msg1;
  MessageType2 msg2;
  MessageType3 msg3;
}

union Response
{
  byte msgType;
  MessageType3 msg1;
  MessageType4 msg2;
  MessageType5 msg3;
}

send(Request request);
receive(Response response);
service MyService 
{
  rpc function1(MessageType1) returns (Response);
  rpc function2(MessageType2) returns (Response);
  rpc function3(MessageType3) returns (Response);

  rpc functionN(MessageType3) returns (MessageType5);
}

(这里可以使用许多许多组合)

service MyService 
{
    rpc function(Request) returns (Response);
}

选项可能取决于

  • 客户端首选目标:基于ZeroMQ或gRPC的客户端
  • 比较ZeroMQ与基于gRPC的服务的性能原因
  • 特定功能,例如如何在ZeroMQ与基于gRPC的服务和客户端中使用/处理订阅(请参阅如何在grpc中正确设计发布-订阅模式?)

对于第一个选项,与第二个选项相比,您需要做很多事情。您必须将发送的消息类型与预期接收的消息类型相匹配。

如果其他人将开发客户端,则第二个选项将允许更容易/更快地理解所提供服务的功能。

为了在ZeroMQ上开发RPC服务,我会定义这样的. proto文件,指定函数、参数(所有可能的输入和输出参数)和错误,如下所示:

enum Function 
{
    F1 = 0;
    F2 = 1;
    F3 = 2;
}

enum Error 
{
    E1 = 0;
    E2 = 1;
    E3 = 2;
}

message Request
{ 
    required Function function = 1;
    repeated Input data = 2;
}

message Response
{ 
    required Function function = 1;
    required Error error = 2;
    repeated Output data = 3;
}

message Input
{ 
    optional Input1 data1 = 1;
    optional Input2 data2 = 2;
    ...
    optional InputN dataN = n;
}

message Output
{ 
    optional Output1 data1 = 1;
    optional Output2 data2 = 2;
    ...
    optional OutputN dataN = n;
}

message Message
{
   repeated Request requests;
   repeated Response responses;
}

根据函数id,在运行时必须检查参数的数量和类型。

 类似资料:
  • 我有一个简单的客户端和服务器设置。客户端希望在服务器中使用ZeroMQ执行通信方法。我将使用REQ和REP套接字,因为它们适合这个用例。然而,我对protobuf的定义有疑问。我认为这两个选项可用于实现目标: 其中“control”包含要远程执行的方法的名称。另一种选择可以是: 最好的方法是什么?或者至少使用一种方法而不是另一种方法的权衡是什么?

  • 我有一个我想要定义的方法,叫做FindAll,它不需要参数。普罗托克在抱怨。 应为类型名。 这是针对行: rpc findAll()返回(BenchmarksList);

  • 据我所知,协议缓冲区主要用于控制服务器和客户端代码的项目。我的一般问题是——协议缓冲区能否用于将二进制消息序列化/反序列化到使用现有协议的服务器?所以,我的问题: > 如果协议缓冲区不支持本机微调现有协议的序列化/反序列化方式,那么可以通过扩展添加该功能吗?是否可以以某种方式添加序列化/反序列化方法可以识别的关键字?也许这可以通过扩展或修改protobuf csharp port或protobuf

  • 默认情况下,Dart-RPC在服务器和客户端之间传输对象(类实例)时使用JSON序列化。 如何使用Protobuf(协议缓冲区)序列化 是否可以使用“接受”请求标头指定序列化方法(如内容类型)? 这是我尝试的, 我使用了以下定义文件,表示实体: 生成了人。pb。dart对于我来说,使用protoc gen dart插件,通过运行以下命令: 还有一些样板dart rpc代码: 打开功能请求:http

  • 试图使用Ionic 4中的协议缓冲区进行编码 我已经下载了协议并用它来生成一堆_pb.js文件,每个. proto文件一个。很好。 首先关注原型示例。这是示例代码: 我做了一些更改以匹配我的文件。更改proto文件的名称。但是我的proto文件中没有包名称。所以我只是使用了消息名称。首先这是我的. proto文件的开头: 下面是我修改后的代码: 这似乎不起作用。我的控制台显示: 我相信我已经成功地

  • 问题内容: 我正在使用gSoap将旧式C 系统重构为SOA。我们遇到了一些性能问题(非常大的XML),因此我的领导要我看一下协议缓冲区。我做到了,它看起来非常酷(我们需要C 和Java支持)。但是协议缓冲区是仅用于序列化的解决方案,现在我需要将其发送到Java前端。从C ++和Java角度来看,我应该使用什么来通过HTTP(只是内部网络)发送那些序列化的内容? PS。另一个人试图加速我们的gSoa