最近在看一个项目,里面有很多第三方库,里面有一个就是Nanomsg。不知道为什么我对这个库的名字和他的主页特别有感觉 嘿嘿!介于这个项目是我今年主力要看完并且改造的项目,所以我决定把他的第三方库都学习一下,之后还会有rxcpp这个特别刺激。。。。。。
这个系列主要是讲使用,具体原理,不考究。原理牵涉到很多很多很多别的东西。
这是一个可以跨进程(当然也可以单进程)的通信库,而且看下来用起来还很方便,这篇先说最简单的
Pipeline (A One-Way Pipe)
我用的是官网的例子,然后改成多线程,注意这个并不是服务器和客户端,应该说是通过pipe建立的同等级客户端,只不过一个只能发 一个只能收
我先把流程梳理一下
这是一个单向发送的PIPE(管道通信,本来就是半双工)
看个函数
int nn_socket(int domain, int protocol)
这是创建socket的函数,通过domain来确定nn_socket函数会创建的socket类型,然后通过protocol来确定管道方向
[domain]
AF_SP — Nanomsg的封装过的socket
Standard full-blown SP socket.
AF_SP_RAW — 原生socket
Raw SP socket. Raw sockets omit the end-to-end functionality found in AF_SP sockets and thus can be used to implement intermediary devices in SP topologies.
protocol:
NN_PUSH — 只发不收
This socket is used to send messages to a cluster of load-balanced nodes. Receive operation is not implemented on this socket type.
NN_PULL — 只收不发
This socket is used to receive a message from a cluster of nodes. Send operation is not implemented on this socket type.
这个应该没什么问题,下面是第二个要注意的函数
int nn_recv (int s, void *buf, size_t len, int flags);
这是接收消息的函数,玩过socket的朋友都知道,recv有两种,阻塞和非阻塞,对于tcp socket而言,特别是CS架构,IO复用的会后会有不阻塞的情况,但是这里我们要阻塞,因为他是本地的socket,不存在监听连接这个事情,而且不是跨进程,就是跨线程,不阻塞直接就结束了。。。。(这里我是简单提了,具体的大家可以自己查一下)
flag就是控制是否阻塞,一般用两个值,0代表阻塞,NN_DONTWAIT代表不阻塞
s是之前生成的接受方向的socket
好玩的东西,buf,他可以是两种模式,一种是传入一个已经创建好的数组
char buf [100];
nbytes = nn_recv (s, buf, sizeof (buf), 0);
这种就是自维护的模式
还有一种就是动态创建模式,使用nanomsg提供的接口来完成
传入一个void* 空指针的地址,如果nn_recv的返回值小于0,说明有报错
NN_MSG 是一个Nanomsg提供的宏一个比较大的值((size_t)- 1),这里就用这个就可以
然后通过nn_freemsg ( )函数来释放
void *buf = NULL;
nbytes = nn_recv (s, &buf, NN_MSG, 0);
if (nbytes < 0) {
/* handle error */
...
}
else {
/* process message */
...
nn_freemsg (buf);
}
发送函数和接受函数类似,直接看例子
int nn_send (int s, const void *buf, size_t len, int flags);
nbytes = nn_send (s, "ABC", 3, 0);
//or
void *msg = nn_allocmsg(3, 0); //他会再nn_send后自动释放,不要手动去释放,会有未知错误
strncpy(msg, "ABC", 3);
nbytes = nn_send (s, &msg, NN_MSG, 0);
还有一个,删除端连接函数,就是把socket同PIPE管道的连接断开,
int nn_shutdown (int s, int how)同时还有个函数int nn_close (int s)这个就是关闭socket的函数,这两个看上去能混用,其实还是有区别的,nn_shutdown会把当前流里的数据都接受完,再关闭,nn_close就是直接关,然后丢弃流里的数据
how参数填nn_bind or nn_connect的返回值,下面就讲这两个函数
最后我把两个函数一起讲
int nn_bind (int s, const char *addr);
int nn_connect (int s, const char *addr);
s还是那个socket,关键是addr,对于PIPE来说,就是一个本地文件,
"ipc:///tmp/pipeline.ipc"
ipc://是协议头,这是一个必须的条件,后面的内容其实就是可创建的文件路径,注意权限问题
bind就是创建一个PIPE,connect就是加入这个PIPE,自然应该先创建,再加入,一般是接受方创建,然后发送方加入,同一个socket是可以多次bind和connect的,这样就可以加入不同的群组,当然我这里其实不需要的。
好了,接下来看代码
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
#include <iostream>
#include <thread>
#define NODE0 "node0"
#define NODE1 "node1"
#define NODE2 "node2"
void fatal(const char *func)
{
std::cout << func << ":" << nn_strerror(nn_errno()) << std::endl;
exit(1);
}
int node0(const char *url)
{
int sock;
int rv;
if ((sock = nn_socket(AF_SP, NN_PULL)) < 0)
{
fatal("nn_socket");
}
if ((rv = nn_bind(sock, url)) < 0)
{
fatal("nn_bind");
}
for (;;)
{
char *buf = NULL;
int bytes;
if ((bytes = nn_recv(sock, &buf, NN_MSG, 0)) < 0)
{
fatal("nn_recv");
}
std::cout << std::this_thread::get_id() << std::endl;
printf("NODE0: RECEIVED \"%s\"\n", buf);
nn_freemsg(buf);
}
}
int node1(const char *url, const char *nodeName, const char *msg)
{
size_t sz_msg = strlen(msg) + 1; // '\0' too
int sock;
int rv;
int bytes;
if ((sock = nn_socket(AF_SP, NN_PUSH)) < 0)
{
fatal("nn_socket");
}
if ((rv = nn_connect(sock, url)) < 0)
{
fatal("nn_connect");
}
std::cout << std::this_thread::get_id() << std::endl;
printf("%s : SENDING \"%s\"\n", nodeName, msg);
if ((bytes = nn_send(sock, msg, sz_msg, 0)) < 0)
{
fatal("nn_send");
}
sleep(1); // wait for messages to flush before shutting down
return (nn_shutdown(sock, rv));
}
int main()
{
const char *url = "ipc:///tmp/pipeline.ipc";
std::thread recv_thread{node0, url};
sleep(1);
std::thread send_thread1{node1, url, "send node1", "hello"};
std::thread send_thread2{node1, url, "send node2", "bye bye"};
recv_thread.join();
return 0;
}
我是用clion+cmake
CMakeList.txt :
cmake_minimum_required(VERSION 3.14)
project(nanomsgLearning)
set(CMAKE_CXX_STANDARD 17)
LINK_DIRECTORIES(/usr/local/lib/)
add_executable(nanomsgLearning main.cpp)
target_link_libraries(${PROJECT_NAME} nanomsg pthread)
注意带上编译库哦
结果
139951375120128
send node1 : SENDING “hello”
139951366727424
send node2 : SENDING “bye bye”
139951467345664
NODE0: RECEIVED “hello”
139951467345664
NODE0: RECEIVED “bye bye”
跨进程,得用terminal,展示起来不太方便,但是也是可以的,同时也能发现,不只是一对一哦,多可以多个发,但是不能多个收哦,他还是通过内部流来传输,读完就结束,并且是要排队的,如果有两个收,就会有一个收客户端收不到。
总结:PipeLine其实是一个组,里面有一张纸,一支铅笔 一块橡皮,写的人(发送方)轮流用铅笔写,看的人(接收方)看一条,擦一条。是不是很简单呀