前言:本文主要介绍Mushroom系统底层各个模块,帮助读者尽快熟悉本系统。
main函数的设计思路是,一个main函数的主线程负责传感器数值的采集以及发送,同时单开一个用于控制器的控制线程。
在控制线程中,接收上层数据中心发送的指令,这个指令包含了控制器号以及即将修改的控制器状态。我们使用controller_set()来设置控制器状态,同时返回执行后的结果,成功则为1,失败则为0。这个返回的结果同样是通过TCP/IP进行传输的。同时我们刚刚修改的控制器进行状态读取,确保设置是成功的。
fx-serial.* 文件,主要涉及到的是优先队列,以及串口通信读写等功能,都预留了相应的API。以后的扩展无需涉及到这里面的代码。
数据采集模块位于fx-serial文件中。
本系统的数据读取是通过PLC来实现的。在实际操作时需要使用串口线连接PLC,并对串口进行读写,从而实现数据的获取。本系统采用RS-422标准。连接PLC时所使用的串口线是SGI 8孔引脚。
由于FT232R USB转串口芯片符合RS-422标准,因此可以转换成使用FT232R USB转串口与SGI插针相互连接。
数据采集需要对PLC的D寄存器进行读写控制。三菱的fx-2n系列的PLC拥有自己的一套可编程口协议。而对其读写的前提是设置好对应的参数:
int open_serial_source(char *device){
struct termios newtio;
int fd;
fd = open(device, O_RDWR | O_NOCTTY | O_NDELAY/* | O_NONBLOCK*/);
if (fd < 0) {
printf("serial open error:%s\n", strerror(errno));
return -1;
}
memset(&newtio, 0, sizeof(newtio)); /* Serial port setting */
tcgetattr(fd, &newtio);
newtio.c_cflag = CLOCAL | CREAD;
newtio.c_cflag |= PARENB;
newtio.c_cflag &= ~PARODD;
newtio.c_cflag &= ~CSTOPB;
newtio.c_cflag &= ~CSIZE;
newtio.c_cflag |= CS7;
newtio.c_iflag |= (INPCK | ISTRIP);
cfsetispeed(&newtio, B9600);
cfsetospeed(&newtio, B9600);
/* Raw output_file */
newtio.c_oflag = 0;
if (tcflush(fd, TCIFLUSH) >= 0 && tcsetattr(fd, TCSANOW, &newtio) >= 0) {
printf("fd ok\n");
return fd;
}
close(fd);
return -1;
}
这些串口参数可以参考这篇文章。
帧结构的介绍文件在svn中可寻,在这里不在介绍。
读命令的请求帧的实现:
static int getReadCommandFrame (char *buf, int *sz, int address, int num)
{
if (buf == NULL || sz == NULL ||
(address < 0 || address > 255) || num < 0)
return -1;
buf[0] = 0x02;
buf[1] = '0';
_getAddressAscii(address, &buf[2]);
num = num *2;
buf[6] = _getAscii(num/10);
buf[7] = _getAscii(num%10);
buf[8] = 0x03;
int i, sum = 0;
for (i = 1; i <= 8; i++)
sum += buf[i];
i = sum&0xFF;
buf[9] = _getAscii(i/16);
buf[10] = _getAscii(i%16);
*sz = 11;
return 0;
}
与之类似,对于写命令而言,需要将写入的DATA部分要明确给出:
static int getWriteCommandFrame(char *buf, int *sz, int address, int num, char *data)
{
if (buf == NULL || sz == NULL ||
(address < 0 || address > 255) || num < 0)
return -1;
buf[0] = 0x02;
buf[1] = '1';
_getAddressAscii(address, &buf[2]);
num = num*2;
buf[6] = _getAscii(num/10);
buf[7] = _getAscii(num%10);
int i;
for (i = 0; i < num*2; i+=4) {
buf[8+i] = data[i+2];
buf[8+i+1] = data[i+3];
buf[8+i+2] = data[i];
buf[8+i+3] = data[i+1];
}
buf[8+num*2] = 0x03;
int sum = 0;
for (i = 1; i <= 8+num*2; i++)
sum += buf[i];
i = sum&0xFF;
buf[8+num*2+1] = _getAscii(i/16);
buf[8+num*2+2] = _getAscii(i%16);
*sz = 8+num*2+3;
return 0;
}
这两个命令帧的差别在于有无数据写入,以及CMD类型。
响应帧的解析。首先需要检验读命令的响应帧的完整性,若完整,则需解析出该响应帧的第二部分。然后将这一部分转换为ascii码值,并将字节序调整,重新计算即可。
网络通信模块位于main文件中,作用是将经过采集、处理后的数据发送给数据中心。
本系统充当的是客户端角色,负责发送数据给数据中心,同时接收上层的命令。因此采取多线程的模式,让系统协调的运转起来。
使用Linux上的网络编程接口:
sockfd = socket(PF_INET,SOCK_STREAM,0);//创建套接字描述符
bzero(&servaddr,sizeof(servaddr));//清空地址结构体
servaddr.sin_family = AF_INET;//指定协议
servaddr.sin_port=htons(port);//绑定端口
inet_pton(AF_INET,ip,&servaddr.sin_addr);//绑定ip地址
//连接服务器
for(nsec = 1;nsec <= MAXSLEEP;nsec <<= 1)
{
if(connect(sockfd,servaddr,addrlen) == 0)
return 0;
if(nsec <= MAXSLEEP/2)//sleep nesc,then connect retry
sleep(nsec);
}
这里用到一个connect的包裹函数,作用是实现超时重连,如果连接失败,则每次等待1、2、4、8……秒后继续尝试重新连接,直到MAXSLEEP为止。
int my_connect(int sockfd,const struct sockaddr *servaddr,socklen_t addrlen)
{
int nsec;
for(nsec = 1;nsec <= MAXSLEEP;nsec <<= 1)
{
if(connect(sockfd,servaddr,addrlen) == 0)
return 0;//connection accepted
if(nsec <= MAXSLEEP/2)//sleep nesc,then connect retry
sleep(nsec);
}
return -1;
}
具体可以参考:
connect的包裹函数
listen的包裹函数
因为在实际使用中,会遇到很多问题,比如:网络接口松开,服务器死机,服务器重启等这一类的常见问题。所以需要心跳机制来控制这些问题。
网络传输使用的是TCP/IP协议,它是通过发送应答通信包来判断网络是否连通。客户端和服务器任一端给另一端发送了FIN分节时,表示断开连接。因此我们可以用发送心跳包的方式来检测网络是否通畅。恰好在Linux网络编程中,提供了SO_KEEPALIVE的参数。这个参数的基本原理是在idle的时间段内如果没有数据来往,则发送一个心跳包,如果没有应答,则将在intv时间间隔后重新发送心跳包。如果还无应答,则继续发送心跳包cnt次。如此往复,如果依然失败,则判定此时网络无法连通,表明网络确实有问题。相关处理由实际情况而定。
下面是保持心跳长连的核心代码:
int my_heartbeat(int fd)
{
int alive,error,idle,cnt,intv;
/*
* open keepalive on fd
*/
Restart:
alive = 1;//set keepalive open
ret=setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&alive,sizeof(alive));
if(ret < 0)
{
DEBUG("set socket option error.\n");
goto Restart;
}
/*
* 60S without data,send heartbeat package
*/
idle = 30;
ret = setsockopt(fd,SOL_TCP,TCP_KEEPIDLE,&idle,sizeof(idle));
if(ret < 0)
{
DEBUG("set keepalive idle error.\n");
return -1;
}
/*
* without any respond,3m later resend package
*/
intv = 180;
ret = setsockopt(fd,SOL_TCP,TCP_KEEPINTVL,&intv,sizeof(intv));
if(ret < 0)
{
DEBUG("set keepalive intv error.\n");
return -2;
}
/*
* send 5 times,without any response,mean connect lose
*/
cnt = 5;
ret = setsockopt(fd,SOL_TCP,TCP_KEEPCNT,&cnt,sizeof(cnt));
if(ret < 0)
{
DEBUG("set keepalive cnt error.\n");
return -3;
}
}
网络通信的另一个关键点在于数据传送的完整性。使用系统提供的read/write函数时,往往会有读写不全的情况。有时一般读写错误发生,在正常的文件I/O情况下认为这是一个错误,但在网路传输中是可以忽略的。
本系统主要需要面对的是写完整的问题,所以只需要使用安全写函数。
因此需要重新构造出几个相对安全且可重入的读写函数:
/*
* 安全读写函数
*/
ssize_t safe_write(int fd, const void *vptr, size_t n)
{
size_t nleft;
ssize_t nwritten;
const char *ptr;
ptr = vptr;
nleft = n;
while(nleft > 0)
{
if((nwritten = write(fd, ptr, nleft)) <= 0)
{
if(nwritten < 0 && errno == EINTR) //被信号中断,重写
nwritten = 0;
else //error
return -1;
}
nleft -= nwritten;
ptr += nwritten;
}
return(n);
}
ssize_t safe_read(int fd,void *vptr,size_t n)
{
size_t nleft;
ssize_t nread;
char *ptr;
ptr=vptr;
nleft=n;
while(nleft > 0)
{
if((nread = read(fd,ptr,nleft)) < 0)
{
if(errno == EINTR) //被信号中断,重读
nread = 0;
else //出错
return -1;
}
else if(nread == 0) //EOF
break;
nleft -= nread;
ptr += nread;
}
return (n-nleft);
}
在设计数据采集的时候,主要是通过房间号进行的一个采集,每个传感器的号直接注册到了几个数组中,因此通过遍历数组进行采集,同时把采集到的数据保存的同一个数组中room_info[]。由于同一类型的传感比较多,比如房间一的温度传感器有五个。此时我们发送出去的温度只需要一个,因此需要对数据做一个整合。
有一点需要注意的是,我们实际读取到的raw数据是没有经过转换的,需要使用对应的公式进行线性变换。大概有如下的五个公式:
这五个公式是用于进行线性变换的,基本每个公式对应一个函数,其中T有两个,这是因为有一个房间的温度值有所不同,具体的可以参看PLC每个寄存器对应的类别(svn上可寻)。
本系统目前采用的第三方的Protobuf-c(详细见系统部署章节)序列化库。而数据中心使用的是Protobuf-.net库,因此消息序列化格式能够统一。
为了保证数据可以被正确的解析,设计了一个简单的格式帧如下:
DATALENGTH | MUSHROOM | MSG_HEADER | DATA |
---|
每条消息的开始是DATALENGTH,表示整个消息的长度,用于消息的完整接收的首要条件;第二部分是消息标识“MUSHROOM”,如果消息不包含这个标识则忽略;第三部分是MSG_HEADER头部消息,用来表示消息以及请求的类型,第四部分是传送的数据DATA。实际操作中的使用memmove将一条消息按照上述格式进行流的拼接。
由于这部分正在进行重构,所以不再详细介绍。如果对protobuf的部分有兴趣了解,可以参考这篇文章。
我们需要对数据进行打包,利用相关的字符串操作函数对消息进行打包拼接:
len = 8+2+mh_length+si_length;
slen = 8+mh_length+si_length;
buf = malloc(len);
index = buf;
sprintf(buf,"%d%s",slen,"MUSHROOM");
index += strlen(buf); //move to the end
memmove((void *)index,buf_mh,mh_length);
index += mh_length;
memmove((void *)index,buf_si,si_length);
printf("%d\n",len);
free(buf_si);
本系统的日志记录分为三个部分:系统运行时的记录、程序warning记录,以及程序error的记录。
文件记录是目前版本中使用的,大致可以抽象出的日志记录模块如下:
FILE* LOG = NULL;
LOG = fopen(filename,"a+");
if(LOG == NULL)
fprintf(stderr,"Open logfile error\n");
fprintf(LOG,"%s","data");
数据库记录使用的是sqlite,这是一个轻量级的文件数据库,相当小巧。其语法和sql是类似的,且提供C语言编程接口。
由于已不再使用,所以不再赘述。如果你感兴趣,可以参考这篇文章
实际在软件开发的时候可以在代码中设置一些调试宏,用于debug调试。
DEBUG宏:
/*
* 反斜线把这个定义延续到下一行;
*_ _func__和_Function_一样的;
* 使用do{...}while(0)构造后的宏定义不会受到大括号、分号等的影响,而且可以定义空宏而不受警告。
*/
/*
* __LINE__:在源代码中插入当前源代码行号;
* __FILE__:在源文件中插入当前源文件名;
* __DATE__:在源文件中插入当前的编译日期
* __TIME__:在源文件中插入当前编译时间;
* __func__:输出函数名称。
*/
#include<stdio.h>
#define DEBUG_PRINT do{}while(0)
#if defined(DEBUG_PRINT)
#define DEBUG(...)\
do{\
fprintf(stderr,"-----DEBUG-----\n");\
fprintf(stderr,"%s %s\n",__TIME__,__DATE__);\
fprintf(stderr,"%s:%d:%s():",__FILE__,__LINE__,__func__);\
fprintf(stderr,__VA_ARGS__);\
}while(0)
#endif
int main(void)
{
DEBUG("Debug successfully!\n");
return 0;
}
有了这些宏之后,实现调试的时候可省去大量重复代码,并且提高速度。你也可以效仿写一些assert断言宏。
后期可以对整个系统的结构进行调整,可以按照区分两个线程的方式进行模块化,也可以进一步按照上述的系统模块化,因为当前版本的关系比较错乱,所以需要降低各模块之间的耦合度。
另外,以后可以将系统改成手动控制的版本,例如添加开启、暂停、房间号选择、传感器选择、重启等命令,使得程序功能多样化。
在系统稳定后,可以使用iniparser库对配置文件进行抽离,使得非开发人员也可以对本系统的一些配置进行修改。