一、为什么需要流式编程
首先解释一下什么是流式编程,所谓流式编程是指编程接口可以接收部分输入数据,边接收边处理,不必等待完整数据。这好比 TCP 数据流,因为网络传输的因素,每次读操作并不能保证会获得完整的数据块,每次仅读取一部分数据,多次读取才可能会读到一块完整的数据,基于 TCP 的应用服务也比较多,象 HTTP/SMTP/POP 等服务都是基于 TCP 传输协议进行数据传输的,因为 TCP 流的流式特点,所以这些应用都定义了数据完整性的规则。正因为有了这些数据完整性规则,才使得编程变得简单,因此很多实现都是根据这些完整性原则在读取了完整数据后才进行处理。
流式编程因为允许每次输入的数据仅是一部分,甚至可能只有一个字节,所以就需要维护一个数据流的处理状态(内部可能会有很多标志位和缓冲区),一般会采用有限状态机的方法进行编程(象知名的 telnet 服务就是采用了有限状态机的编程思想)。
既然可以把基于 TCP 流的应用协议根据具体应用的数据完整性规则转换为非流式编程,那为什么还需要采用复杂的流式编程呢?原因就是为了适应数据流的多样性以及应用的复杂性。采用流式编程方式,既可以应用于非阻塞 IO,又可以应用于阻塞 IO,而如果采用非流式编程,则一般仅能采用阻塞 IO 编程了;同时,采用流式编程,还可以非常容易使数据流以管道的方式从一个流式编程接口输给下一个流式编程接口,实现流水线式的数据处理过程。
二、acl_cpp 中流式编程接口的设计
acl_cpp 中为了处理流式数据,设计了两种类:流式数据处理器(简称流式处理器)和流式处理器管理器(流式管理器)。流式处理器类中,定义了一个虚接口,规定了子类为了实现某一具体应用的流式处理功能而必须遵循的接口;流式管理器,负责管理流式处理器,将这些流式处理器组成一个数据流管道,接收用户的数据输入,将数据流从一个管道(即流式处理器)传向另一个管道。
1、流式处理器类
pipe_stream:流式处理器基类,是一个纯虚类。继承该类的子类必须实现三个虚方法中的两个纯虚接口:push_pop,pop_end,子类可根据需要实现另一个虚方法:clear。
1.1、push_pop 纯虚接口定义:
/**
* 数据输入输出接口
* @param in {const char*} 输入数据的地址
* @param len {size_t} 输入数据长度
* @param out {string*} 存储输出结果缓冲区,不能为空
* @param max {size_t} 希望接收到输出结果的长度限制,如果为0则
* 表示没有限制,输出结果都存储在 out 缓冲区中
* @return {int} 输出数据的长度,如果 < 0 则表示出错
*/
virtual int push_pop(const char* in, size_t len,
string* out, size_t max = 0) = 0;
该接口接收外部数据流,同时将中间处理结果输出。
1.2、pop_end 纯虚接口定义:
/**
* 最后处理的输出数据接口
* @param out {string*} 存储输出结果缓冲区,不能为空
* @param max {size_t} 希望接收到输出结果的长度限制,如果为0则
* 表示没有限制,输出结果都存储在 out 缓冲区中
* @return {int} 输出数据的长度,如果 < 0 则表示出错
*/
virtual int pop_end(string* out, size_t max = 0) = 0;
当数据流结束时(即已经读到了完整的数据时),子类必须实现该接口,将自身缓冲区里的数据处理后输出给调用者。
2、流式管理器类
pipe_manager:流式处理器类的管理器类。该类主要定义并实现了四个方法:push_back,push_front,update,update_end。
2.1 push_back/push_front 方法:
/**
* 以尾部添加的方式注册新的管道流处理器
* @param stream {pipe_stream*} 管道流处理器对象
* @return {bool} 如果该管道流处理器对象已经存在则返回 false
*/
bool push_back(pipe_stream* stream);
/**
* 以头部添加的方式注册新的管道流处理器
* @param stream {pipe_stream*} 管道流处理器对象
* @return {bool} 如果该管道流处理器对象已经存在则返回 false
*/
bool push_front(pipe_stream* stream);
这两个方法分别以尾部或头部添加的方式,将流式处理器加入到流式管道中,形成数据流处理的管道。需要注意的是:添加的流式处理器对象必须在 pipe_manager 的类实例作用域内依然有效,pipe_manager 流式管理器实例并不负责流式处理器对象的销毁,如果这些流式处理器是动态创建的,用户应该负责对象销毁。
2.2、update/update_end 方法:
/**
* 应用向管道流管理器添加新数据,由该管理器依次传递给所有已注册管道流
* 处理器,同时从已注册管道流处理器接收处理结果,依次传递给下一个
* @param src {const char*} 待处理的数据地址
* @param len {size_t} src 数据长度
* @param out {pipe_stream*} 如果非空,则该管道处理器将是最后一个只接收
* 输入而不进行输出的管道处理器
* @return {bool} 是否有错误发生
*/
bool update(const char* src, size_t len, pipe_stream* out = NULL);
/**
* 最后必须调用一次该函数,以使有些管道的缓冲区里的数据可以一次性地
* 刷新至最后的管道中
* @param out {pipe_stream*} 如果非空,则该管道处理器将是最后一个只接收
* 输入而不进行输出的管道处理器
* @return {bool} 是否有错误发生
*/
bool update_end(pipe_stream* out = NULL);
这两个方法提供了数据输入及数据输出的方法,允许用户每次仅输入部分数据,流式管理器内部会自动将数据流在各个流式处理器之间进行传递;当用户确定数据完整时,应该调用 update_end 将管道流中可能存在的最后结果数据取出。
三、示例
以 HTTP 应用为例,客户端在接收服务器响应数据时,假设数据是采用 utf-8 字符集的 xml 数据格式,同时对数据进行了压缩处理。则客户端接收到数据后,如果将接收到数据转换为GBK字符集后再提取数据字段,处理顺序为:解压缩->字符集转换->xml解析,然后才提取出需要的数据字段。如果这三个处理过程都提供了流式接口,则要方便得多,我们只需将数据输入一个流式处理器,然后提取中间处理结果,再将中间处理结果输入到另一个流式处理器即可。
下面列出了实现上述功能的示例代码:
#include "lib_acl.hpp"
bool http_get(acl::istream& in)
{
// 初始化解压库
acl::zlib_stream zlib;
if (zlib.pipe_unzip_begin() = false)
{
printf("初始化解压库失败\r\n");
return false;
}
// 初始化字符集转码库
acl::charset_conv utf8ToGbk;
if (utf8ToGbk.update_begin("utf-8", "gbk") == false)
{
printf("初始化字符集转码库失败\r\n");
return false;
}
acl::xml xml; // xml 数据解析器
acl::pipe_manager manager; // 流式管理器
// 以尾部添加方式分别添加:解压流式处理器、字符集转码处理器以及 xml 解析处理器,
// 从而使处理管理流的处理方向为:解压处理->字符集转码处理->xml解析处理
manager.push_back(&zlib);
manager.push_back(&utf8ToGbk);
manager.push_back(&xml);
// 循环读取数据流,进行处理
char buf[4096];
int ret;
while (true)
{
ret = in.read(buf, sizeof(buf) - 1, false); // 读取部分数据
if (ret == -1)
break;
buf[ret] = 0;
if (manager.update(buf, ret) == false) // 输入数据至流式管理器
{
printf("流式处理器内部出错\r\n");
return false;
}
}
if (manager.update_end() == false) // 处理最后一部分数据
{
printf("流式处理器内部出错\r\n");
return false;
}
// 假设完整数据为:
// <users><user name="zsx1" age="1" /><user name="zsx2" age="2" /></users>
// 想要提取名字为 zsx2 的 age 字段,则可如下处理:
// 提取符合 users/user 方式的 xml 结点对象集合
const std::vector<acl::xml_node*>& users = xml.getElementsByTags("users/user");
if (users.empty())
{
printf("zsx2 未发现\r\n");
return false;
}
// 遍历查询结果集,找颞骨 zsx2 的 xml 结点
std::vector<acl::xml_node*>::const_iterator cit = users.begin();
for (; cit != users.end(); ++cit)
{
// 提取 name 属性值
const char* user = (*cit)->attr_value("name");
if (strcasecmp(user, "zsx2") != 0)
continue;
// 提取 age 属性值
const char* age = (*cit)->attr_value("age");
if (age == NULL)
continue;
printf("zsx2's age: %s\r\n", age);
return true;
}
printf("zsx2's age not found\r\n");
return false;
}
以上是一个简单的流式编程的示例,所有符合流式编程规则的处理器类不仅可以组合起来,由 pipe_manager 管理器统一管理,而且也可以单独使用。
四、acl_cpp 库中支持流式编程的流式处理器类
在 acl_cpp 库中能够支持流式处理功能的类有:
4.1、可以和 pipe_manager 流式管理器配合的流式处理器
pipe_string:字符串处理双向管理流;
xml:xml 数据格式流式解析处理器;
json:json 数据格式流式解析处理器;
mime_code/mime_base64/mime_uucode/mime_xxcode/mime_quoted_printable:邮件数据格式编码/解码的流式处理器;
charset_conv:字符集转码处理器;
zlib_stream:流式压缩/解压处理器;
ostream:IO 输出流处理器。
4.2、暂时不能与 pipe_manager 流式管理器配合的流式处理器
mime:邮件 mime 数据的流式解析处理器;
rfc2047:邮件 mime rfc2047 编码的流式解析处理器。
本文链接地址:http://zsxxsz.iteye.com/blog/1566188
github: https://github.com/acl-dev/aclQQ 群:242722074