当前位置: 首页 > 工具软件 > zmq-vala > 使用案例 >

zmq 学习总结

申屠裕
2023-12-01

 

   

 

// 非线程安全

 

zeromq api 学习总结:

       1.  返回值为void * , 失败是null(大多数)

       2. 返回值是 int , linux c 一样, 失败为-1,成功为0(大多数)

zmq_socket

void*  zmq_socket(void* context, int type);

   成功返回一个不透明的套接字句柄

 失败返回的就是NULL

 

   API 会根据context , type 创建一个不透明的套接字句柄.  并且是没有初始化连接的

 也没有和任何端口联系在一起,如果是客户端,这时就需要用zmq_connect 去连接到服务器

  如果是服务器,需要去用zmq_bind API去绑定端口.

   与传统套接字不同的是 ,传统套接字不是流协议,就是报文协议

   zmq 的套接字提出了 异步消息队列,并且伴随着准确的队列语义, 前面的实现依赖于

type,也就是不同的使用套接字类型

ZMQ传输的是无保障的消息

 

  context : 可以从 zmq_ctx_new() apI 获取

  type: 表示的是你需要创建的是何种套接字类型

 

 

type:

   请求-应答模型(ZMQ-REQ)

      该模型是被用作 从一个 客户端发送请求到一个或者是多个服务器,然后服务器对每个请求做出请求.

      如果没有服务器存在,客户端会组则, 直到有一个服务器变成可用状态为止,其服务器操作是 zmq_recv, zmq_send.即交替使用

   对于客户端来说,需要先send, recv

 

exqmple:

     #include <stdio.h>

#include <stdlib.h>

extern "C"

{

#include <zmq.h>

}

#pragma comment(lib,"libzmq_d.lib")

#define buffersize 4096

int main(int argc, char* argv[])

{

 

void * ctx = zmq_ctx_new();

void* server = zmq_socket(ctx, ZMQ_REQ);

 

zmq_connect(server, "tcp://localhost:5050");

char* buffer[256] = { 0 };

int ret = 0;

char* str = "我是客户端.....";

while (true)

{

memset(buffer, 0, 256);

zmq_send(server, str, strlen(str), 0);

zmq_recv(server, buffer, 256, 0);

printf("%s\n", buffer);

}

system("pause");

zmq_close(server);

zmq_ctx_destroy(ctx);

return 0;

}

 

ZMQ_REP模式

     该模型用于 服务器 去接受请求, 并对接受的请求做出相应的套接字模式 , 即先 recv, 再 send

 

example :

 #include <stdio.h>

#include <stdlib.h>

extern "C"

{

#include <zmq.h>

}

#pragma comment(lib,"libzmq_d.lib")

#define buffersize 4096

int main(int argc, char* argv[])

{

 

void * ctx = zmq_ctx_new();

void* server = zmq_socket(ctx, ZMQ_REQ);

 

zmq_connect(server, "tcp://localhost:5050");

char* buffer[256] = { 0 };

int ret = 0;

char* str = "我是客户端.....";

while (true)

{

memset(buffer, 0, 256);

zmq_send(server, str, strlen(str), 0);

zmq_recv(server, buffer, 256, 0);

printf("%s\n", buffer);

}

system("pause");

zmq_close(server);

zmq_ctx_destroy(ctx);

return 0;

}

 

ZMQ_DEALER  模型

   该模型是对请求响应套接字的扩展, 每个消息的发送是通过轮询调度算法去实现. 其 send和 recv 是不受限制 的

   

  example:

       server:

     // ZMQ_DECLARE_SERVER.cpp : 定义控制台应用程序的入口点。

//

 

#include "stdafx.h"

#include <stdio.h>

extern "C"{

#include <zmq.h>

}

#pragma comment(lib,"libzmq_d.lib")

int _tmain(int argc, _TCHAR* argv[])

{

void* ctx = zmq_ctx_new();

void* server = zmq_socket(ctx, ZMQ_DEALER);

 zmq_bind(server, "tcp://*:5050");

 char* buffer[1024] = {0};

 char* str = "我是服务器...";

while (true)

{

zmq_recv(server,buffer,1023,0);

printf("server: %s\n", buffer);

zmq_send(server, str, strlen(str), 0);

}

 

return 0;

}

 

     client:

         // ZMQ_DECLARE_SERVER.cpp : 定义控制台应用程序的入口点。

//

 

#include "stdafx.h"

#include <stdio.h>

extern "C"{

#include <zmq.h>

}

#pragma comment(lib,"libzmq_d.lib")

int _tmain(int argc, _TCHAR* argv[])

{

void* ctx = zmq_ctx_new();

void* server = zmq_socket(ctx, ZMQ_DEALER);

zmq_connect(server, "tcp://127.0.0.1:5050");

char* buffer[1024] = { 0 };

char* str = "我是客户端...1";

 

while (true)

{

 

 

zmq_send(server, str, strlen(str), 0);

zmq_recv(server, buffer, 1023, 0);

printf("server: %s\n", buffer);

}

 

return 0;

}

 

 

   

 

ZMQ_STREAM 模型

  example: 模仿web服务器.

 

#include <stdio.h>

#include <stdlib.h>

extern "C"

{

#include <zmq.h>

}

#pragma comment(lib,"libzmq_d.lib")

#define BUFFERSIZE 4096

int main(int argc, char* argv[])

{

void* ctx = zmq_ctx_new();

void* httpServer = zmq_socket(ctx, ZMQ_STREAM);

zmq_bind(httpServer, "tcp://*:5050");

char* buffer[BUFFERSIZE] = { 0 };

char http_response[] =

"HTTP/1.0 200 OK\r\n"

"Content-Type: text/plain\r\n"

"\r\n"

"我都不知道给说些什么!反正是http啦!.";

int ret = 0;

while (true)

{

memset(buffer, 0, BUFFERSIZE);

ret = zmq_recv(httpServer, buffer, BUFFERSIZE,0);

printf("%s\n", buffer);

zmq_send(httpServer, buffer, ret, ZMQ_SNDMORE);

zmq_send(httpServer, http_response, strlen(http_response), ZMQ_SNDMORE);

zmq_send(httpServer, buffer, ret, ZMQ_SNDMORE);

zmq_send(httpServer, 0, 0, ZMQ_SNDMORE);

}

    

system("pause");

zmq_close(httpServer);

zmq_ctx_destroy(ctx);

return 0;

}

 

 

ZMQ_ROUTER

该模型是一个请求应答模型的扩展,  当接受到一个消息的时候, 我们应该考该消息是否包含一个标示,该标示属于发起端,  当使用该模式发出消息时, 我们因该考虑删除这个消息的首部 ,并用它去标识这个消息,,如果这个端点不存在任何更多的, 那么这个消息在默认的情况下将会被抛弃

除非 socket 选项被设置成了1

 

send , recv  的顺序是不受限制的.

 

ZMQ__PUB 模型

   就是发送方, 该类型的套接字不能接受消息, 只能发送消息

 

ZMQ_SUB 模型

    接受方, 与ZMQ_PUB连用, 不能发送消息, 只能接受消息

  

 

ZMQ_PAIR: 该套接字模型 , 类似于双向管道, 也就是socketpair()

  

 

 

   

    

 

zmq_bind:  在套接字基础上, 接受一个正在连接的套接字 (bind, [listen])

 

返回值: 成功返回 0

返回值: 失败返回的是 -1

   zmq_bind:

     int zmq_bind(void* socket,const char* endpoint)

   socket: 可以总 zmp_socket 的返回值获得

   endpoint: example  “tcp://127.0.0.1:5050”;

   

zmq_close: 关闭套接字, 有zmq_socket指定的套接字

     int zmq_close(void* socket);

     socket 可以从 zmq_socket 获得

成功返回0, 失败返回的是-1.

 

zmq_connect: 建立一个连接 (connect)

 

  int zmq_connext(void* socket.const char* endpoint )

 成功返回的是0 , 失败返回的是-1

 

 

 

 zmq_ctx_destroy - 释放zmq 的 环境

 

int zmq_ctx_destroy(void* context);

  context 是 zmq_ctx_new的返回值.

 成功返回的是 0. 失败返回的是-1.

 

  zmq_ctx_get

 int zmq_ctx_get(void* context, int option_name);

  context 来源于zmq_ctx_new的返回值

 

 option_name;

      ZMO_IO_THREADS: 返回值为zmq的线程池的大小

      ZMQ_MAX_SOCKETS:  返回值为context可以容纳的最大套接字

      ZMQ_IPV6:  设置IPV6的选项

失败返回的是 -1

  

 

 zmq_ctx_new : 分配一个新的上下文环境

 

 zmq_ctx_set:

  int zmq_ctx_set(void *context,int option_name,int option_val);

    ZMO_IO_THREADS: 返回值为zmq的线程池的大小, 默认值为1.

    ZMQ_MAX_SOCKETS:  返回值为context可以容纳的最大套接字, 默认值为1024.

 

      ZMQ_IPV6:  设置IPV6的选项, 默认值是0

 

  返回值, 成功返回的是0, 失败返回的是-1

 

zmq_error(void);  

   发生错误的时候, 调用它, 可以返回错误值

 

 

 

zmq_msg_close:  释放一个zmq消息

 

int zmq_msg_close(zmq_msg_t * msg)

成功返回的是0  失败返回的是 -1

 

 

zmq_msg_copy: 拷贝一个消息的内容到另外一个消息

   

 int zmq_msg_copy(zmq_msg_t *dest,zmq_msg_t *src);

成功返回的是0, 失败返回的是 -1

 

zmq_msg_data: 返回该消息包含的数据

   void* zmq_msg_data(zmq_msg_t * msg);

 

zmq_msg_get: 该函数的作用在于返回 msg 的相关属性

int zmq_msg_get(zmq_msg_t* message,int property)

 

   property:

   ZMQ_MORE:

       指示着是否有更多的消息

   失败返回的是 -1

zmq_msg_init: 初始化一个空的消息

   int zmq_msg_init(zmq_msg_t* msg);

   没有失败

zmq_msg_init_data:

    

int zmq_msg_init_data (zmq_msg_t '*msg', void '*data', size_t 'size', zmq_free_fn '*ffn', void '*hint');

 

zmq_free_fn '*ffn' 是一个函数. data, hint 会被传送到这里

example:

 

    void my_free (void *data, void *hint)

{

    free (data);

}

 

    /*  ...  */

 

void *data = malloc (6);

assert (data);

memcpy (data, "ABCDEF", 6);

zmq_msg_t msg;

rc = zmq_msg_init_data (&msg, data, 6, my_free, NULL);

assert (rc == 0);

 

zmq_msg_init_size: 用指定的大小去初始化

 

int zmq_msg_init_size (zmq_msg_t '*msg', size_t 'size');*

  成功返回0, 失败返回的是 -1;

 

zmq_msg_more - 查看是否还有更多的消息

 

   *int zmq_msg_more (zmq_msg_t '*message');*

 0 代表的是最后的消息, 而 1 代表的是还有消息

 

   example:

    zmq_msg_t part;

while (true) {

    //  Create an empty 0MQ message to hold the message part

    int rc = zmq_msg_init (&part);

    assert (rc == 0);

    //  Block until a message is available to be received from socket

    rc = zmq_msg_recv (socket, &part, 0);

    assert (rc != -1);

    if (zmq_msg_more (&part))

        fprintf (stderr, "more\n");

    else {

        fprintf (stderr, "end\n");

        break;

    }

    zmq_msg_close (&part);

}   

  

  

 

zmq_msg_move : 把一个消息的内容移动到另一个消息中.

   int zmq_msg_move (zmq_msg_t '*dest', zmq_msg_t '*src');

   成功返回的是0 , 失败返回的是 -1

 zmq_msg_recv: 从socket中接受消息

int zmq_msg_recv (zmq_msg_t '*msg', void '*socket', int 'flags')

  如果是成功的则返回的是 接受到的字节数量, 如果是失败的, 则返回的是 -1

int zmq_msg_send (zmq_msg_t '*msg', void '*socket', int 'flags')

  如果成功返回的发送的字节数, 如果失败返回的是 -1.

size_t zmq_msg_size (zmq_msg_t '*msg')

   返回的是该消息的大小

 

const char *zmq_strerror (int 'errnum');

  // 根据错误值,返回错误消息

 

  *void zmq_version (int '*major', int '*minor', int '*patch');

返回的zmq的当前版本.

 

 

zmq_socket_monitor 所支持的事件

 

   ZMQ_EVENT_CONNECTED:链接已建立

  当和远程的另一端的连接建立好的时候,ZMQ_EVENT_CONNECTED事件会被触发。同步和异步事件都会发生触发此事件。事件值是新连接的socket的FD。

 

ZMQ_EVENT_CONNECT_DELAYED:同步连接失败,仍在进行重试

  当一个请求立即连接的尝试被延迟并且仍然在尝试的时候,此事件被触发。事件值没有意义。

 

ZMQ_EVENT_CONNECT_RETRIED:尝试异步连接/重连

  当一个连接尝试被重连计时器捕获后此事件被触发。重连间隔根据所有的尝试情况进行计算。事件值是重连间隔。

 

ZMQ_EVENT_LISTENING:socket已经绑定了某个地址,准备好接受连接请求

  当一个socket成功的绑定在一个端口上的时候此事件被触发。事件值是新绑定的socket的FD。

 

ZMQ_EVENT_BIND_FAILED:socket无法绑定在这个地址上

  当一个socket无法绑定在给定的端口上时此事件被触发。事件值是绑定函数修改后的errno值。

 

ZMQ_EVENT_ACCEPTED:连接请求被接受

  一个从远端到来的连接被一个绑定了地址的socket接受并建立了连接是会触发此事件。事件值是被接受socket的FD。

 

ZMQ_EVENT_ACCEPT_FAILED:无法接受客户端的连接请求

  当一个连接请求试图连接另一个socket失败的时候会触发此事件。事件值是accept设置的errno值。

 

ZMQ_EVENT_CLOSED:连接关闭

  当一个连接的底层描述符被关闭是会触发此事件。事件值是被关闭的socket的FD。此时这个FD已经被关闭了。

 

ZMQ_EVENT_CLOSE_FAILED:连接无法被关闭

  当一个描述符无法被释放回OS的时候会触发此事件。注意:只对IPC socket有效。事件值是释放失败时设置的errno值。

 

ZMQ_EVENT_DISCONNECTED:会话被破坏

  当流引擎(尤其是TCP、IPC)出现了崩溃的/被破坏的会话时,此事件被触发。事件值是socket的FD。

Return value

当函数zmq_socket_monitor() 执行成功时,返回0或者更大值。否则返回 -1,并且设置errno为下列指定值

 

 

 

 

 

 

 

 

 

 

 

   

 

 

 

 

   

       

 

    

 

      

 

        

   

 

 

 

 

 

 

      

 

 

   

 

 

     

    

 

 类似资料: