当前位置: 首页 > 文档资料 > YoC 编程基础 >

消息队列

优质
小牛编辑
118浏览
2023-12-01

概述

多任务系统中,任务间互相同步等待共享资源,我们一般会使用信号量,如果需要互斥,则使用互斥量。而任务间互相收发消息则可以使用消息队列。消息队列(queue)使用类似信号量的机制进行任务间的同步,并使用环形缓冲池(ring buffer)来进行消息的队列缓冲管理,以达到任务间收发消息的阻塞和通知管理。Queue的实现目的在于任务间互相收发消息。一般如果有信号量机制,用户就可以自己实现一套任务间的阻塞和通知收发功能,其本质在于接收方通过信号量的获取来开始接收消息,发送方通过信号量的释放来通知接收方处理。接收任务在无消息时被阻塞,消息到来时被唤醒处理。Queue就是基于这样一种类信号量机制来进行消息的收发。再加上ring buffer的缓冲机制来缓存任务间的消息队列,就组合成了本章的消息队列(queue)。其既包含消息的缓冲队列,又包含了消息的通知机制。 消息队列模块整体受宏RHINO_CONFIG_BUF_QUEUE开关控制, 对应的AOS API接口实现位于:aos_rhino.c中,由RHINO_CONFIG_BUF_QUEUE宏定义包含实现; 对应的krhino内部实现位于:·k_buf_queue.c

包含头文件

#include <aos/aos.h>      //总头文件
#include <aos/kernel.h>   //直接对应头文件

接口定义

创建一个队列

int aos_queue_new(aos_queue_t *queue, void* buf, unsigned int size, int max_msg)
  • 参数:

    • queue:队列描述结构体指针;需要用户定义一个queue结构体;例:‘aos_queue_t g_queue’; 传入&g_queue
    • buf:此queue队列的缓冲区起点;例:char buf[1000];传入buf
    • size:此queue队列的缓冲区大小
    • max_msg:一次存入缓冲区的最大数据单元
  • 返回值:

    • 0:执行成功;其他:返回失败

删除一个队列

void aos_queue_free(aos_queue_t *queue)

删除一个队列,并释放阻塞在其中的任务

  • 参数:

    • queue:队列描述结构体指针
  • 返回值:

向queue内发送数据

int aos_queue_send(aos_queue_t *queue, void* msg, unsigned int size)
  • 参数:

    • queue:队列描述结构体指针
    • msg:发送数据起始内存
    • size:发送数据大小
  • 返回值

    • 0:执行成功;其他:返回失败

从queue内收取数据

int aos_queue_recv(aos_queue_t *queue, unsigned int ms, void* msg, unsigned int*size)
  • 参数:

    • queue:队列描述结构体指针
    • ms: 传入0表示不超时,立即返回;AOS_WAIT_FOREVER表示永久等待;其他数值表示超时时间,单位ms
    • msg:返回获取到的数据的内存指针
    • size:返回获取到的数据大小
  • 返回值:

    • 0:执行成功;其他:返回失败

判断一个队列queue是否有效

int aos_queue_is_valid(aos_queue_t *queue)
  • 参数:

    • queue: 判断一个队列queue是否有效
  • 返回值

    • 0:queue无效;1:queue有效

获取一个队列queue的缓冲区起点

void *aos_queue_buf_ptr(aos_queue_t* queue)
  • 参数: *queue:队列描述结构体指针
  • 返回值:
    • NULL: 获取失败;其他:返回队列queue的缓冲区起点

示例代码

#define QUEUE_BUF_LEN 100

static aos_queue_t test_queue;
static char queue_buf[QUEUE_BUF_LEN];

static void task1_entry(void *arg)
{
    char *msg_send = "Hello, Queue!";

    aos_msleep(3000);    // 任务休眠3000ms

    printf("task1 send msg\n");

    /*发送消息*/
    aos_queue_send(&test_queue, msg_send, strlen(msg_send));
}

static void task2_entry(void *arg)
{
    char msg_recv[16] = {0};
    unsigned int size_recv = 16;

    printf("task2 wait msg\n");

    memset(msg_recv, 0, size_recv);
    aos_queue_recv(&test_queue, 100000, msg_recv, &size_recv);

    printf("task2 get msg: ", size_recv);
    for (int i = 0; i < size_recv; i++) {
        printf("%c", msg_recv[i]);
    }
    printf("\n");

    /*获取到消息,当前任务继续执行下去*/
    printf("queue test successfully!\n");

    /*删除消息队列*/
    aos_queue_free(&test_queue);
}

void test_queue_start(void)
{
    int ret = -1;

    aos_msleep(1000);    // 任务休眠1000ms

    /*当前任务:创建消息队列,消息队列最大为长度为100,单条消息最大为20*/
    ret = aos_queue_new(&test_queue, queue_buf, QUEUE_BUF_LEN, 20);
    if (ret != 0) {
        printf("queue create failed\n");
        return;
    }

    aos_task_new("task1", task1_entry, NULL, 512);
    aos_task_new("task2", task2_entry, NULL, 512);
}