当前位置: 首页 > 工具软件 > Theron > 使用案例 >

C++并发编程框架Theron(5)——File reader(1)

曹普松
2023-12-01

1 前言
  在上一篇博文,我主要通过Hello world!的示例,介绍了actors,frameworks,messages和receivers几个构建Theron框架程序的要点。但是Hello world!实例只是一个再简单不过的单actor的应用程序,我们学习Theron框架自然是希望多个actor相互协作来达到多线程开发的目的。在本篇博文中,我们会学习到一个更复杂,更有实际用处的程序示例。
2 File reader
  File reader示例是创建一个文件服务器,用来为客户端读取来自磁盘文件中的数据。我们要实现的是这样一个消息传递的交互:发送给actor一条消息,告诉它去读取一个文件数据到缓存区,并且随后再返回一条读取完毕的消息。
  我们可以看到,使用Theron创建这样一个文件服务器可以带来下面几点优势:
  ①、因为文件阅读被表达成actors,它可以与系统剩下的程序同时执行,没有额外的花费。因为使用消息传递机制。所以文件读取时异步而非堵塞的;
  ②、将文件阅读器放置到一个framework中,我们可以创建一个线程池(多个线程)阻塞文件阅读,从而不会饿死系统剩下程序。
  ③、支持更多的并行文件阅读只是意味着增加了更多的相同actor的拷贝,程序实现很简单,但是可以多线程完成很多文件的读取。
 2.1 actor
  首先,我们创建一个通用的多线程系统形式“Worker”actor,如下:

#include <Theron/Theron.h> 

template <class WorkMessage> 
class Worker : public Theron::Actor 
{ 
public: 
   // 构造函数 
    Worker(Theron::Framework &framework) : Theron::Actor(framework) 
    { 
        RegisterHandler(this, &Worker::Handler); 
    } 
private:
    // 消息处理函数 
    void Handler(const WorkMessage &message, const Theron::Address from)

    { 
        // 消息参数是const,搜易我们需要拷贝它来改变它的性质 
        WorkMessage result(message); 
        result.Process(); 
        // 返回线程消息给发送者
        Send(result, from); 
    } 
};

  通过它自己,这个actor模板不能做太多事情。当实例化一个具体的WorkMessage类型,它会变成该消息类型的一个处理者(也就是一个线程)。Worker从一个叫作Handler()的消息处理函数中获得处理WorkMessage消息的能力。这个处理函数通过拷贝消息来响应WorkMessage消息,并且在其上执行Process()方法,最后将其返回。重要的一点是,我们将Process()放在一个actor里面被调用,从而使得它能与外面程序一起运行。
  这块模板类的作用是,使得Worker能够处理许多不一样的消息类型,即重复使用。
 2.2 消息类定义
  现在让我们写一个ReadRequest消息类型来表示文件读取的请求,ReadRequest对象可以调用一个Process()方法,所以可以作为WorkMesssage被一个Worker所处理。它既会用来请求文件读取,也会用来返回结果,所以它封装关于从磁盘文件读取数据的任何内容,包括实际中数据怎样被读取:

// 数据读取请求: 读取一个磁盘文件的内容到缓存区.
struct ReadRequest 
{ 
public: 

    explicit ReadRequest( 
        const Theron::Address client = Theron::Address(), 
        const char *const fileName = 0, 
        unsigned char *const buffer = 0, 
        const unsigned int bufferSize = 0) : 
      mClient(client), 
      mFileName(fileName), 
      mProcessed(false), 
      mBuffer(buffer), 
      mBufferSize(bufferSize), 
      mFileSize(0) 
    { 
    } 
    void Process() 
    { 
        mProcessed = true; 
        mFileSize = 0; 
        // 尝试打开文件 
        FILE *const handle = fopen(mFileName, "rb"); 
        if (handle != 0) 
        { 
            // 读取数据文件,设置实际读取长度 
            mFileSize = (uint32_t) fread( 
                mBuffer, 
                sizeof(unsigned char), 
                mBufferSize, 
                handle); 

            fclose(handle); 
        } 
    } 
    Theron::Address mClient;            // 请求客户的地址
    const char *mFileName;              // 请求文件的名称 
    bool mProcessed;                    // 文件是否被读取过
    unsigned char *mBuffer;             // 文件内容的缓存区
    unsigned int mBufferSize;           // 缓存区的大小
    unsigned int mFileSize;             // 文件的字节长度 
}; 

  我们可以使用任何class或者struct作为Theron中的消息,唯一需要注意的是需要严格要求消息类型必须安全地可拷贝。当一个消息被发送,Theron会首先创建一个新的消息拷贝,以此发送端和接收端可以看到不同的拷贝数据,从而避免共用一个消息内存。
  实际上消息可拷贝是有重要的隐含信息的。它意味着我们消息必须是轻量级的,否则性能会大大的降低。消息中通过指针指向文件名和内存缓冲器,而不是缓冲器本身,这样可以避免大量的数据的拷贝(也就是所谓的浅拷贝)。
  无论什么时候我们发送一个消息中的指针,我们会访问actor接收消息来接近依靠指针指向的内存地址。因为发送者也可以接近这块内存,其实也是潜在的共享内存。这看似与我们一直强调的避免共享内存相违背,因为这样做非常危险,但是我们仍然在这里这样做是因为使用消息同步接近共享缓存区,可以确保发送端和接收端并不是同一时刻区接近这块缓存区。
 2.3 主函数
  消息与actor已经创立完毕,下面我们来创建一个简单的main程序来串联起来运行。首先,我们需要创建一个Worker actor,然后发送给它一系列ReadRequest类型的消息(使用命令行读取的),等待处理结果,最后打印输出所有读取文件的细节:

static const int MAX_FILES = 16; 
static const int MAX_FILE_SIZE = 16384; 

int main(int argc, char *argv[]) 
{ 
    if (argc < 2) 
    { 
        printf("Expected up to 16 file name arguments.\n"); 
    } 
    // 创建一个worker去处理工作 
    Theron::Framework framework; 
    Worker<ReadRequest> worker(framework); 
    // 注册一个receiver来捕(catcher)获返回的结果
    Theron::Receiver receiver; 
    Theron::Catcher<ReadRequest> resultCatcher; 
    receiver.RegisterHandler(&resultCatcher, &Theron::Catcher<ReadRequest>::Push);
    // 命令行上每个文件名称作为请求消息
    for (int i = 0; i < MAX_FILES && i + 1 < argc; ++i) 
    { 
        unsigned char *const buffer = new unsigned char[MAX_FILE_SIZE]; 
        const ReadRequest message( 
            receiver.GetAddress(), 
            argv[i + 1], 
            buffer, 
            MAX_FILE_SIZE); 
        framework.Send(message, receiver.GetAddress(), worker.GetAddress());
    } 
    // 等待所有结果 
    for (int i = 1; i < argc; ++i) 
    { 
        receiver.Wait(); 
    } 
    // 处理结果,我们仅打印文件的名称
    ReadRequest result; 
    Theron::Address from; 
    while (!resultCatcher.Empty()) 
    { 
        resultCatcher.Pop(result, from); 
        printf("Read %d bytes from file '%s'\n", result.FileSize(), result.FileName());
        // 释放申请的空间 
        delete [] result.Buffer(); 
    } 
} 

  我们再详细的理一遍整个过程,受我们创建了一个Theron::Framework,这个Framework前面介绍过就是一个管理类来主持actors。
  接着我们楚江了一个Worker actor模板实例,实例化ReadRequest消息类型。我们将该Worker actor和framework进行绑定,从而可以有效的管理framework中的worker。
  再接着我们创建了一个Theron::Receiver。这钱Hello world!中已经提及过Receiver是一个帮助类,拥有自己的地址,可以让非actor的代码能够接收来自actors的消息(main函数自然不是actor模式创建的,所以需要一个具体地址来接收来自actor的消息)。Wait()方法可以用来同步等待来自actors的消息。
  和同步消息一样,Receiver也允许我们处理和检查消息。我们可以像传统类一样注册一个公共的方法作为消息的处理函数。但是在这个案例中我们使用的是Theron::Catcher,它是另一个帮助者,是一个线程安全的队列可以捕获被Receiver收到的所有消息。Catcher的Push()方法可以被注册为一个Receiver的消息处理函数。
3 小结
  这篇博文我们主要完成了一个读取文件功能的应用,我们接触了三个Theron框架的核心概念:管理actors的Frameworks,让非actor的代码可以接收来自actors的消息的Receivers;以及存储接收自Receivers的消息。
  但是这个实例到目前为止有一个缺陷,就是尽管Worker可以在一个独立线程异步处理ReadRequests消息,但是它仍然是串联来处理所有的消息。当Worker收到一系列ReadRequests消息的时候,它的Handler()消息处理函数会严格按照顺序依次被执行。实际上,这些请求在Worker的内部消息队列中排列着。下一篇博文我会扩展这个样例,通过写一个Dispatcher actor(actor调度员)来创建和控制一系列Workers来同时并行处理多个请求。
  以上是个人学习记录,由于能力和时间有限,如果有错误望读者纠正,谢谢!
  转载请注明出处:http://blog.csdn.net/FX677588/article/details/75201088


  参考文献:
  Theron框架官网File reader章节http://www.theron-library.com/index.php?t=page&p=lesson01

 类似资料: