当前位置: 首页 > 工具软件 > node-nanomsg > 使用案例 >

Nanomsg使用说明 Pipeline (A One-Way Pipe)

孔逸春
2023-12-01

最近在看一个项目,里面有很多第三方库,里面有一个就是Nanomsg。不知道为什么我对这个库的名字和他的主页特别有感觉 嘿嘿!介于这个项目是我今年主力要看完并且改造的项目,所以我决定把他的第三方库都学习一下,之后还会有rxcpp这个特别刺激。。。。。。
这个系列主要是讲使用,具体原理,不考究。原理牵涉到很多很多很多别的东西。

回归主题 Nanomsg

这是一个可以跨进程(当然也可以单进程)的通信库,而且看下来用起来还很方便,这篇先说最简单的

Pipeline (A One-Way Pipe)

我用的是官网的例子,然后改成多线程,注意这个并不是服务器和客户端,应该说是通过pipe建立的同等级客户端,只不过一个只能发 一个只能收
我先把流程梳理一下
这是一个单向发送的PIPE(管道通信,本来就是半双工)
看个函数

int nn_socket(int domain, int protocol)
这是创建socket的函数,通过domain来确定nn_socket函数会创建的socket类型,然后通过protocol来确定管道方向

[domain]
AF_SP — Nanomsg的封装过的socket

Standard full-blown SP socket.

AF_SP_RAW — 原生socket

Raw SP socket. Raw sockets omit the end-to-end functionality found in AF_SP sockets and thus can be used to implement intermediary devices in SP topologies.

protocol:

NN_PUSH — 只发不收

This socket is used to send messages to a cluster of load-balanced nodes. Receive operation is not implemented on this socket type.

NN_PULL — 只收不发

This socket is used to receive a message from a cluster of nodes. Send operation is not implemented on this socket type.

这个应该没什么问题,下面是第二个要注意的函数

int nn_recv (int s, void *buf, size_t len, int flags);

这是接收消息的函数,玩过socket的朋友都知道,recv有两种,阻塞和非阻塞,对于tcp socket而言,特别是CS架构,IO复用的会后会有不阻塞的情况,但是这里我们要阻塞,因为他是本地的socket,不存在监听连接这个事情,而且不是跨进程,就是跨线程,不阻塞直接就结束了。。。。(这里我是简单提了,具体的大家可以自己查一下)

flag就是控制是否阻塞,一般用两个值,0代表阻塞,NN_DONTWAIT代表不阻塞

s是之前生成的接受方向的socket

好玩的东西,buf,他可以是两种模式,一种是传入一个已经创建好的数组

char buf [100];
nbytes = nn_recv (s, buf, sizeof (buf), 0);

这种就是自维护的模式
还有一种就是动态创建模式,使用nanomsg提供的接口来完成
传入一个void* 空指针的地址,如果nn_recv的返回值小于0,说明有报错
NN_MSG 是一个Nanomsg提供的宏一个比较大的值((size_t)- 1),这里就用这个就可以
然后通过nn_freemsg ( )函数来释放

void *buf = NULL;
nbytes = nn_recv (s, &buf, NN_MSG, 0);

if (nbytes < 0) {
    /* handle error */
    ...
}
else {
    /* process message */
    ...
    nn_freemsg (buf);
}

发送函数和接受函数类似,直接看例子

int nn_send (int s, const void *buf, size_t len, int flags);
nbytes = nn_send (s, "ABC", 3, 0);
//or
void *msg = nn_allocmsg(3, 0); //他会再nn_send后自动释放,不要手动去释放,会有未知错误
strncpy(msg, "ABC", 3);
nbytes = nn_send (s, &msg, NN_MSG, 0);

还有一个,删除端连接函数,就是把socket同PIPE管道的连接断开,
int nn_shutdown (int s, int how)同时还有个函数int nn_close (int s)这个就是关闭socket的函数,这两个看上去能混用,其实还是有区别的,nn_shutdown会把当前流里的数据都接受完,再关闭,nn_close就是直接关,然后丢弃流里的数据
how参数填nn_bind or nn_connect的返回值,下面就讲这两个函数
最后我把两个函数一起讲

int nn_bind (int s, const char *addr);
int nn_connect (int s, const char *addr);

s还是那个socket,关键是addr,对于PIPE来说,就是一个本地文件,

"ipc:///tmp/pipeline.ipc"

ipc://是协议头,这是一个必须的条件,后面的内容其实就是可创建的文件路径,注意权限问题
bind就是创建一个PIPE,connect就是加入这个PIPE,自然应该先创建,再加入,一般是接受方创建,然后发送方加入,同一个socket是可以多次bind和connect的,这样就可以加入不同的群组,当然我这里其实不需要的。

好了,接下来看代码

#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
#include <iostream>
#include <thread>

#define NODE0 "node0"
#define NODE1 "node1"
#define NODE2 "node2"

void fatal(const char *func)
{
    std::cout << func << ":" << nn_strerror(nn_errno()) << std::endl;
    exit(1);
}

int node0(const char *url)
{
    int sock;
    int rv;

    if ((sock = nn_socket(AF_SP, NN_PULL)) < 0)
    {
        fatal("nn_socket");
    }
    if ((rv = nn_bind(sock, url)) < 0)
    {
        fatal("nn_bind");
    }
    for (;;)
    {
        char *buf = NULL;
        int bytes;
        if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0)
        {
            fatal("nn_recv");
        }
        std::cout << std::this_thread::get_id() << std::endl;
        printf("NODE0: RECEIVED \"%s\"\n", buf);
        nn_freemsg(buf);
    }
}

int node1(const char *url, const char *nodeName, const char *msg)
{
    size_t sz_msg = strlen(msg) + 1; // '\0' too
    int sock;
    int rv;
    int bytes;

    if ((sock = nn_socket(AF_SP, NN_PUSH)) < 0)
    {
        fatal("nn_socket");
    }
    if ((rv = nn_connect(sock, url)) < 0)
    {
        fatal("nn_connect");
    }
    std::cout << std::this_thread::get_id() << std::endl;
    printf("%s : SENDING \"%s\"\n", nodeName, msg);
    if ((bytes = nn_send(sock, msg, sz_msg, 0)) < 0)
    {
        fatal("nn_send");
    }
    sleep(1); // wait for messages to flush before shutting down
    return (nn_shutdown(sock, rv));
}

int main()
{
    const char *url = "ipc:///tmp/pipeline.ipc";
    std::thread recv_thread{node0, url};
    sleep(1);
    std::thread send_thread1{node1, url, "send node1", "hello"};
    std::thread send_thread2{node1, url, "send node2", "bye bye"};
    recv_thread.join();

    return 0;
}

我是用clion+cmake
CMakeList.txt :

cmake_minimum_required(VERSION 3.14)
project(nanomsgLearning)

set(CMAKE_CXX_STANDARD 17)
LINK_DIRECTORIES(/usr/local/lib/)
add_executable(nanomsgLearning main.cpp)
target_link_libraries(${PROJECT_NAME} nanomsg pthread)

注意带上编译库哦
结果
139951375120128
send node1 : SENDING “hello”
139951366727424
send node2 : SENDING “bye bye”
139951467345664
NODE0: RECEIVED “hello”
139951467345664
NODE0: RECEIVED “bye bye”
跨进程,得用terminal,展示起来不太方便,但是也是可以的,同时也能发现,不只是一对一哦,多可以多个发,但是不能多个收哦,他还是通过内部流来传输,读完就结束,并且是要排队的,如果有两个收,就会有一个收客户端收不到。

总结:PipeLine其实是一个组,里面有一张纸,一支铅笔 一块橡皮,写的人(发送方)轮流用铅笔写,看的人(接收方)看一条,擦一条。是不是很简单呀

 类似资料: