当前位置: 首页 > 工具软件 > rt0 > 使用案例 >

RT-Thread之消息队列

岳锦
2023-12-01

消息队列的应用场景

消息队列是常用的线程间通信方式,是一种异步的通信方式。消息队列可以应用于多种场合:线程间的消息交换、在中断服务函数中给线程发送消息(中断服务例程不可能接收消息)、使用串口接收不定长数据等。

消息队列的基本概念

队列又称消息队列,是一种常用于线程间通信的数据结构。队列可以在线程与线程间、中断与线程间传送消息,实现了线程接收来自其它线程或中断的不固定长度的消息,并根据不同的接口选择传递的消息是否存放在自己的空间。线程能够从队列中读取消息,当队列中的消息为空时,挂起读取线程。用户还可以指定线程挂起的超时时间timeout;当队列中有新消息时,挂起的读取线程被唤醒以接收并处理新消息。

通过消息队列服务,线程或中断服务例程可以将一条或多条消息放入消息队列中。同样,一个或多个线程可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常将先进入消息队列的消息传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出(FIFO)原则。同时,RT-Thread中的消息队列支持优先级,所有等待消息的线程中优先级最高的会先获得消息。

用户在处理业务时,消息队列提供了异步处理机制,允许将一个消息放入队列,但并不立即处理它,同时队列还能起到缓冲消息的作用。

RT-Thread中使用队列数据结构实现线程异步通信工作,具有如下特性:

  • 消息支持先进先出排队方式与优先级排队方式,支持异步读写工作方式
  • 读队列支持超时机制
  • 支持发送紧急消息
  • 可以允许不同长度(不超过队列节点最大值)的任意类型消息
  • 一个线程能够从任意一个消息队列接收和发送消息
  • 多个线程能够从同一个消息队列接收和发送消息
  • 当队列使用结束后,需要通过删除队列操作来释放内存

消息队列的工作机制

创建消息队列时先创建一个消息队列对象控制块。然后给消息队列分配一块内存空间,组织成空闲消息链表,这块内存的大小等于[(消息大小+消息头)*消息队列容量](对应后文中struct rt_messagequeue中的msg_size以及max_msgs)。接着再初始化消息队列,此时消息队列为空。

RT-Thread操作系统的消息队列对象由多个元素组成,当消息队列被创建时,它就被分配了消息队列控制块:消息队列名称、内存缓冲区消息大小以及队列长度等。同时,每个消息队列对象中包含多个消息框,每个消息框可以存放一条消息。消息队列中的第一个和最后一个消息框分别被称为消息链表头和消息链表尾,对应于消息队列控制块中的msg_queue_head和msg_queue_tail。有些消息框可能是空的,它们通过msg_queue_free形成一个空闲消息框链表。所有消息队列中的消息框总数就是消息队列的长度,这个长度可在消息队列创建时被指定。

线程或中断服务程序都可以给消息队列发送消息。当发送消息时,

(1)消息队列对象先从空闲消息链表上取出下一个空闲消息快;

(2)把线程或中断服务例程发送的消息内容复制到消息块上;

(3)然后把该消息块挂到消息队列的尾部。

当且仅当空闲消息链表上有可用的空闲消息块,发送者才能成功发送消息;当空闲消息链表上无可用消息块时,说明消息队列已满,此时,发送消息的线程或者中断服务程序会收到一个错误码(-RT_EFULL)。

发送紧急消息的过程与发送消息的过程几乎一样,唯一不同的是,当发送紧急消息时,从空闲消息链表上取下来的消息快不是挂到消息队列的队尾,而是挂到消息队列的队首,这样,接受者就能够优先接收到紧急消息,从而及时进行消息处理。

读取消息时,根据消息链表头(msg_queue_head)找到最先进入队列的消息节点进行读取。根据消息队列控制块中的entry判断队列是否有消息读取,对于全部空闲的队列(entry为0)进行读消息操作会引起线程挂起。

当消息队列不再被使用时,应该删除它以释放系统资源。一旦删除操作完成,消息队列将被永久性地删除。

消息队列的阻塞机制

消息队列一般不是属于某个线程的队列,在很多时候,创建的队列是每个线程都可以对其进行读写操作的,但是为了保护被每个线程对其进行读写操作的过程,必须有阻塞机制。在某个线程对消息队列进行读写操作时,必须保证该线程能正常完成读写操作,而不受后来的线程干扰。

那么,如何实现这个机制呢?

很简单,因为RT-Thread已经提供了这种机制,直接使用即可。每个对消息队列读写的函数都有这种机制,称其为阻塞机制。

举个例子:假设有一个线程A,在其对某个队列进行读操作(出队)时发现此队列上没有消息,那么此时线程A有三种选择:第一种选择,既然队列没有消息,那么不再等待,去进行其它操作,这样线程A不会进入阻塞态;第二种选择,线程A继续等待,此时线程A会进入阻塞状态,等待消息的到来。而线程A的等待时间可以自行定义,比如设置1000个tick的超时时间,在这段时间之内,如果没有消息,线程A都是处于阻塞态;若阻塞的这段时间内,线程A收到了队列的消息,则线程A就会由阻塞态变为就绪态。如果此时线程A的优先级比当前运行的线程的优先级高,则线程A就会得到消息并且运行。加入1000个tick过去了,队列还是没有消息,那么线程A就不再等待了,而是从阻塞态中(超时)唤醒,并返回一个没有等到消息的错误代码,然后继续执行线程A的其它代码;第三种选择,线程A一直等待,直到收到消息,这样线程A就会进入阻塞态,直到完成读取队列的消息。

与接收消息机制不同,发送消息操作并不带有阻塞机制,因为发送消息的环境可能是在中断中,不允许出现阻塞的情况。在消息队列已满的情况下,将返回一个错误码(-RT_EFULL)。

消息队列控制块

在RT-Thread中,消息队列控制块是操作系统用于管理消息队列的一个数据结构,由结构体struct rt_messagequeue表示,另一种C表达方式为rt_mq_t,表示的是消息队列的句柄,在C语言中的实现是消息队列控制块的指针。消息队列控制块结构体的详细定义在include\rtdef.h中,代码如下所示:

#ifdef RT_USING_MESSAGEQUEUE
/**
 * message queue structure
 */
struct rt_messagequeue
{
    struct rt_ipc_object parent;                        /**< inherit from ipc_object */

    void                *msg_pool;                      /**< start address of message queue */

    rt_uint16_t          msg_size;                      /**< message size of each message */
    rt_uint16_t          max_msgs;                      /**< max number of messages */

    rt_uint16_t          entry;                         /**< index of messages in the queue */

    void                *msg_queue_head;                /**< list head */
    void                *msg_queue_tail;                /**< list tail */
    void                *msg_queue_free;                /**< pointer indicated the free node of queue */

    rt_list_t            suspend_sender_thread;         /**< sender thread suspended on this message queue */
};
typedef struct rt_messagequeue *rt_mq_t;
#endif

消息队列控制块是一个结构体,其中含有消息队列相关的重要参数,在消息队列的功能实现中起着重要的作用。消息队列控制块包含了每个消息队列的信息,如消息队列名称、存放消息的消息池起始地址、每条消息的长度以及队列中已有的消息个数和能够容纳的最大消息数量等。rt_messagequeue对象从rt_ipc_object中派生,由IPC容器所管理。

消息队列相关接口函数

对一个消息队列的操作包括:创建/初始化消息队列、发送消息、接收消息、删除/脱离消息队列。

使用消息队列模块的典型流程如下:

(1)消息队列创建/初始化函数rt_mq_create()/rt_mq_init()

(2)消息队列发送消息函数rt_mq_send()

(3)消息队列接收函数rt_mq_recv()

(4)消息队列删除;/脱离函数rt_mq_delete()/rt_mq_detach()

  • 创建消息队列

消息队列在使用前,应该被创建出来,或对已有的静态消息队列对象进行初始化,创建消息队列的函数接口在include\rt_thread.h中定义,代码如下:

#ifdef RT_USING_HEAP
rt_mq_t rt_mq_create(const char *name,
                     rt_size_t   msg_size,
                     rt_size_t   max_msgs,
                     rt_uint8_t  flag);
rt_err_t rt_mq_delete(rt_mq_t mq);
#endif

rt_mq_create函数的参数和返回值如下表所示:

参数描述
name消息队列的名称
msg_size消息队列中一条消息的最大长度,单位为字节
max_msgs消息队列的最大个数
flag

消息队列采用的灯带方式,可以取如下数值:

RT_IPC_FLAG_FIFO或RT_IPC_FLAG_PRIO

返回值描述
消息队列对象的句柄成功
RT_NULL失败

创建消息队列时先从对象管理器中分配一个消息队列对象,然后给消息队列对象分配一块内存空间,组织成空闲消息链表,这块内存的大小如前所述,为:[消息大小+消息头(用于链表连接)的大小]*消息队列最大个数。接着再初始化消息队列,此时消息队列为空。具体代码实现在src\ipc.c中,如下所示:

rt_mq_t rt_mq_create(const char *name,
                     rt_size_t   msg_size,
                     rt_size_t   max_msgs,
                     rt_uint8_t  flag)
{
    struct rt_messagequeue *mq;
    struct rt_mq_message *head;
    register rt_base_t temp;

    RT_ASSERT((flag == RT_IPC_FLAG_FIFO) || (flag == RT_IPC_FLAG_PRIO));

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* allocate object */
    mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);
    if (mq == RT_NULL)
        return mq;

    /* set parent */
    mq->parent.parent.flag = flag;

    /* initialize ipc object */
    _ipc_object_init(&(mq->parent));

    /* initialize message queue */

    /* get correct message size */
    mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
    mq->max_msgs = max_msgs;

    /* allocate message pool */
    mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs);
    if (mq->msg_pool == RT_NULL)
    {
        rt_object_delete(&(mq->parent.parent));

        return RT_NULL;
    }

    /* initialize message list */
    mq->msg_queue_head = RT_NULL;
    mq->msg_queue_tail = RT_NULL;

    /* initialize message empty list */
    mq->msg_queue_free = RT_NULL;
    for (temp = 0; temp < mq->max_msgs; temp ++)
    {
        head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
                                        temp * (mq->msg_size + sizeof(struct rt_mq_message)));
        head->next = (struct rt_mq_message *)mq->msg_queue_free;
        mq->msg_queue_free = head;
    }

    /* the initial entry is zero */
    mq->entry = 0;

    /* initialize an additional list of sender suspend thread */
    rt_list_init(&(mq->suspend_sender_thread));

    return mq;
}
RTM_EXPORT(rt_mq_create);

上边为函数概述,函数详细步骤描述如下:

(1)分配消息队列对象。调用rt_object_allocate函数将从对象系统为创建的消息队列分配一个对象,并且设置对象名称。在系统中,对象的名称必须是唯一的。

(2)设置消息队列的阻塞唤醒模式,创建的消息队列针对于指定的flag有不同的意义。使用RT_IPC_FLAG_PRIO创建的IPC对象,在多个线程等待队列资源时,优先级高的线程将优先获得资源;而使用RT_IPC_FLAG_FIFO创建的IPC对象,在多个线程等待队列资源时,将按照先到先得的顺序获得资源。RT_IPC_FLAG_PRIO和RT_IPC_FLAG_FIFO均在rtdef.h中定义,代码如下所示:

#define RT_IPC_FLAG_FIFO                0x00            /**< FIFOed IPC. @ref IPC. */
#define RT_IPC_FLAG_PRIO                0x01            /**< PRIOed IPC. @ref IPC. */

(3)初始化消息队列内核对象。此处会初始化一个链表,用于记录访问此队列而阻塞的线程,通过这个链表,可以找到对应的阻塞线程的控制块,从而能恢复线程。

(4)设置消息队列的节点大小与消息队列的最大容量。节点大小要按照RT_ALIGN_SIZE字节对齐,消息队列的容量由用户自己定义。

(5)给此消息队列分配内存。这块内存的大小为[消息大小+消息头(用于链表连接)的大小]*消息队列容量。每个消息节点中都有一个消息头,用于链表链接,指向下一个消息节点,作为消息的排序。

(6)初始化消息队列头尾链表。

(7)将所有的消息队列的节点连接起来,形成空闲链表。

(8)消息队列的个数置零。

注意:在创建消息队列时,是需要用户自己定义消息队列的句柄的,但定义了消息队列的句柄并不等于创建了队列,创建队列必须通过调用rt_mq_create函数来完成。否则,以后根据队列句柄使用消息队列的其它函数时会发生错误。在创建消息队列时会返回创建的结果,如果创建成功,则返回消息队列句柄;如果返回RT_NULL,则表示创建失败。

关于rt_mq_create函数,必须提到一个容易被忽略的关键点:struct rt_mq_message。前文已经多次提到一个概念:消息头,但是一直没有明确说明消息头具体是指什么,是什么结构,在这里详细讲解一下。上面代码中的struct rt_mq_message即指的是消息头,此结构定义于src\ipc.c中,代码如下:

/**
 * @addtogroup messagequeue
 */

/**@{*/

struct rt_mq_message
{
    struct rt_mq_message *next;
};

由代码可以看出,就是一个最基本的单链表结构。虽然很简单,但明确一下是非常有必要的。

  • 初始化消息队列

初始化消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

rt_err_t rt_mq_init(rt_mq_t     mq,
                    const char *name,
                    void       *msgpool,
                    rt_size_t   msg_size,
                    rt_size_t   pool_size,
                    rt_uint8_t  flag);

rt_mq_init函数的参数和返回值如下表所示:

参数描述
mq消息队列对象的句柄
name消息队列的名称
msgpool指向存放消息的缓冲区的指针
msg_size消息队列中一条消息的最大长度,单位为字节
pool_size存放消息的缓冲区大小
flag消息队列采用的等待方式,可以取如下数值:RT_IPC_FLAG_FIFO或RT_IPC_FLAG_PRIO
返回值描述
RT_EOK成功

初始化静态消息队列对象跟创建消息队列对象类似,只是静态消息队列对象的内存是在系统编译时由编译器分配的,一般放于读数据段或未初始化数据段中。在使用这类静态消息队列对象前,需要进行初始化。初始化消息队列时,该接口需要用户已经申请获得的消息队列对象的句柄(即指向消息队列对象控制块的指针)、消息队列名、消息缓冲区指针、消息大小以及消息队列缓冲区大小。消息队列初始化后,所有消息都挂在空闲消息链表上,消息队列为空。具体代码实现在src\ipc.c中,如下所示:


/**
 * @brief    Initialize a static messagequeue object.
 *
 * @note     For the static messagequeue object, its memory space is allocated by the compiler during compiling,
 *           and shall placed on the read-write data segment or on the uninitialized data segment.
 *           By contrast, the rt_mq_create() function will allocate memory space automatically
 *           and initialize the messagequeue.
 *
 * @see      rt_mq_create()
 *
 * @param    mq is a pointer to the messagequeue to initialize. It is assumed that storage for
 *           the messagequeue will be allocated in your application.
 *
 * @param    name is a pointer to the name that given to the messagequeue.
 *
 * @param    msgpool is a pointer to the starting address of the memory space you allocated for
 *           the messagequeue in advance.
 *           In other words, msgpool is a pointer to the messagequeue buffer of the starting address.
 *
 * @param    msg_size is the maximum length of a message in the messagequeue (Unit: Byte).
 *
 * @param    pool_size is the size of the memory space allocated for the messagequeue in advance.
 *
 * @param    flag is the messagequeue flag, which determines the queuing way of how multiple threads wait
 *           when the messagequeue is not available.
 *           The messagequeue flag can be ONE of the following values:
 *
 *               RT_IPC_FLAG_PRIO          The pending threads will queue in order of priority.
 *
 *               RT_IPC_FLAG_FIFO          The pending threads will queue in the first-in-first-out method
 *                                         (also known as first-come-first-served (FCFS) scheduling strategy).
 *
 *               NOTE: RT_IPC_FLAG_FIFO is a non-real-time scheduling mode. It is strongly recommended to
 *               use RT_IPC_FLAG_PRIO to ensure the thread is real-time UNLESS your applications concern about
 *               the first-in-first-out principle, and you clearly understand that all threads involved in
 *               this messagequeue will become non-real-time threads.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           If the return value is any other values, it represents the initialization failed.
 *
 * @warning  This function can ONLY be called from threads.
 */
rt_err_t rt_mq_init(rt_mq_t     mq,
                    const char *name,
                    void       *msgpool,
                    rt_size_t   msg_size,
                    rt_size_t   pool_size,
                    rt_uint8_t  flag)
{
    struct rt_mq_message *head;
    register rt_base_t temp;

    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT((flag == RT_IPC_FLAG_FIFO) || (flag == RT_IPC_FLAG_PRIO));

    /* initialize object */
    rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name);

    /* set parent flag */
    mq->parent.parent.flag = flag;

    /* initialize ipc object */
    _ipc_object_init(&(mq->parent));

    /* set message pool */
    mq->msg_pool = msgpool;

    /* get correct message size */
    mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
    mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message));

    /* initialize message list */
    mq->msg_queue_head = RT_NULL;
    mq->msg_queue_tail = RT_NULL;

    /* initialize message empty list */
    mq->msg_queue_free = RT_NULL;
    for (temp = 0; temp < mq->max_msgs; temp ++)
    {
        head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
                                        temp * (mq->msg_size + sizeof(struct rt_mq_message)));
        head->next = (struct rt_mq_message *)mq->msg_queue_free;
        mq->msg_queue_free = head;
    }

    /* the initial entry is zero */
    mq->entry = 0;

    /* initialize an additional list of sender suspend thread */
    rt_list_init(&(mq->suspend_sender_thread));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_init);

  • 删除消息队列

当消息队列不再被使用时,应该删除它以释放系统资源。一旦操作完成,消息队列将被永久性地删除。删除消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

#ifdef RT_USING_HEAP
rt_err_t rt_mq_delete(rt_mq_t mq);
#endif

 rt_mq_delete函数的参数和返回值如下表所示:

参数描述
mq消息队列对象的句柄
返回值描述
RT_EOK成功

删除消息队列时,如果有线程被挂起在该消息队列等待队列上,则内核先唤醒挂起在该消息等待队列上的所有线程(线程返回值是-RT_ERROR),然后再释放消息队列使用的内存,最后删除消息队列对象。具体代码实现在src\ipc.c中,如下所示:

/**
 * @brief    This function will delete a messagequeue object and release the memory.
 *
 * @note     This function is used to delete a messagequeue object which is created by the rt_mq_create() function.
 *           By contrast, the rt_mq_detach() function will detach a static messagequeue object.
 *           When the messagequeue is successfully deleted, it will resume all suspended threads in the messagequeue list.
 *
 * @see      rt_mq_detach()
 *
 * @param    mq is a pointer to a messagequeue object to be deleted.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can ONLY delete a messagequeue initialized by the rt_mq_create() function.
 *           If the messagequeue is initialized by the rt_mq_init() function, you MUST NOT USE this function to delete it,
 *           ONLY USE the rt_mq_detach() function to complete the detachment.
 *           for example,the rt_mq_create() function, it cannot be called in interrupt context.
 */
rt_err_t rt_mq_delete(rt_mq_t mq)
{
    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent) == RT_FALSE);

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* resume all suspended thread */
    _ipc_list_resume_all(&(mq->parent.suspend_thread));
    /* also resume all message queue private suspended thread */
    _ipc_list_resume_all(&(mq->suspend_sender_thread));

    /* free message queue pool */
    RT_KERNEL_FREE(mq->msg_pool);

    /* delete message queue object */
    rt_object_delete(&(mq->parent.parent));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_delete);

消息队列删除函数是根据消息队列句柄直接进行删除的,删除之后这个消息队列的所有信息都会被清空,而且 不能再次使用这个消息队列。需要注意的是,如果某个消息队列没有被创建,那当然也是无法被删除的。删除消息队列时会把所有由于访问此消息队列而进入阻塞态的线程从阻塞链表中删除,mq是rt_mq_delete()传入的参数,是消息队列句柄,表示的是想要删除哪个队列。

(1)检测消息队列是否被创建了,只有已经被创建了,才可以进行删除操作。

(2)调用_ipc_list_resume_all函数将所有由于访问此队列而阻塞的线程从阻塞状态中恢复过来,线程获得队列返回的错误代码。在实际情况中一般不这样使用,在删除时,应先确认所有的线程都无须再次访问此队列,并且此时没有线程被此队列阻塞,才进行删除操作。

(3)删除了消息队列,那肯定要把消息队列的内存释放出来。

(4)删除消息队列对象并且释放消息队列内核对象的内存,释放内核对象内存在rt_object_delete函数中实现。

消息队列删除函数rt_mq_delete的使用也很简单,只需传入要删除的消息队列的句柄即可。调用rt_mq_delete函数时,系统将删除这个消息队列。如果删除该消息队列时有线程正在等待消息,那么删除操作会先唤醒等待在消息队列上的线程(等待线程的返回值是-RT_ERROR)。

  • 脱离消息队列

脱离消息队列将使消息队列对象从内核对象管理器中脱离。脱离消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

rt_err_t rt_mq_detach(rt_mq_t mq);

rt_mq_detach函数的参数和返回值如下表所示:

参数描述
mq消息队列对象的句柄
返回值描述
RT_EOK成功

使用该接口后,内核先唤醒所有挂在该消息等待队列对象上的线程(线程返回值是-RT_ERROR),然后将该消息队列对象从内核对象管理器中脱离。具体代码实现在src\ipc.c中,如下所示:

/**
 * @brief    This function will detach a static messagequeue object.
 *
 * @note     This function is used to detach a static messagequeue object which is initialized by rt_mq_init() function.
 *           By contrast, the rt_mq_delete() function will delete a messagequeue object.
 *           When the messagequeue is successfully detached, it will resume all suspended threads in the messagequeue list.
 *
 * @see      rt_mq_delete()
 *
 * @param    mq is a pointer to a messagequeue object to be detached.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can ONLY detach a static messagequeue initialized by the rt_mq_init() function.
 *           If the messagequeue is created by the rt_mq_create() function, you MUST NOT USE this function to detach it,
 *           and ONLY USE the rt_mq_delete() function to complete the deletion.
 */
rt_err_t rt_mq_detach(rt_mq_t mq)
{
    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent));

    /* resume all suspended thread */
    _ipc_list_resume_all(&mq->parent.suspend_thread);
    /* also resume all message queue private suspended thread */
    _ipc_list_resume_all(&(mq->suspend_sender_thread));

    /* detach message queue object */
    rt_object_detach(&(mq->parent.parent));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_detach);

  • 消息队列发送消息

线程或者中断服务程序都可以给消息队列发送消息。当发送消息时,消息队列对象先从空闲消息链表上取下一个空闲消息块,把线程或者中断服务程序发送的消息内容复制到消息块上,然后把该消息块挂到消息队列的尾部。当且仅当空闲消息链表上有可用的空闲消息块时,发送者才能成功发送消息;当空闲消息链表上无可用消息块时,说明消息队列已满,此时,发送消息的线程或者中断服务程序会收到一个错误码(-RT_EFULL)。发送消息的函数接口在include\rt_thread.h中定义,代码如下:

rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size);
rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout);

rt_mq_send函数的参数和返回值如下表所示:

参数描述
mq消息队列对象的句柄
buffer消息内容
size消息大小
返回描述
RT_EOK成功
-RT_EFULL消息队列已满
-RT_ERROR失败,表示发送的消息长度大于消息队列中消息的最大长度

发送消息时,发送者需指定发送的消息队列的对象句柄(即指向消息队列控制块的指针),并且指定发送的消息内容以及消息大小。在发送一条普通消息之后,空闲消息链表上的队首消息被转移到了消息队列尾。发送消息的函数实现在src\ipc.c中,代码如下:

/**
 * @brief    This function will send a message to the messagequeue object.
 *           If there is a thread suspended on the messagequeue, the thread will be resumed.
 *
 * @note     When using this function to send a message, if the messagequeue is fully used,
 *           the current thread will wait for a timeout.
 *           By contrast, when the messagequeue is fully used, the rt_mq_send_wait() function will
 *           return an error code immediately without waiting.
 *
 * @see      rt_mq_send_wait()
 *
 * @param    mq is a pointer to the messagequeue object to be sent.
 *
 * @param    buffer is the content of the message.
 *
 * @param    size is the length of the message(Unit: Byte).
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can be called in interrupt context and thread context.
 */
rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
{
    return rt_mq_send_wait(mq, buffer, size, 0);
}
RTM_EXPORT(rt_mq_send);

该函数实际上是rt_mq_send_wait函数的封装,rt_mq_send_wait函数也在同文件中实现,就在上边,代码如下:

/**
 * @brief    This function will send a message to the messagequeue object. If
 *           there is a thread suspended on the messagequeue, the thread will be
 *           resumed.
 *
 * @note     When using this function to send a message, if the messagequeue is
 *           fully used, the current thread will wait for a timeout. If reaching
 *           the timeout and there is still no space available, the sending
 *           thread will be resumed and an error code will be returned. By
 *           contrast, the rt_mq_send() function will return an error code
 *           immediately without waiting when the messagequeue if fully used.
 *
 * @see      rt_mq_send()
 *
 * @param    mq is a pointer to the messagequeue object to be sent.
 *
 * @param    buffer is the content of the message.
 *
 * @param    size is the length of the message(Unit: Byte).
 *
 * @param    timeout is a timeout period (unit: an OS tick).
 *
 * @return   Return the operation status. When the return value is RT_EOK, the
 *           operation is successful. If the return value is any other values,
 *           it means that the messagequeue detach failed.
 *
 * @warning  This function can be called in interrupt context and thread
 * context.
 */
rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout)
{
    register rt_ubase_t temp;
    struct rt_mq_message *msg;
    rt_uint32_t tick_delta;
    struct rt_thread *thread;

    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);

    /* current context checking */
    RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);

    /* greater than one message size */
    if (size > mq->msg_size)
        return -RT_ERROR;

    /* initialize delta tick */
    tick_delta = 0;
    /* get current thread */
    thread = rt_thread_self();

    RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();

    /* get a free list, there must be an empty item */
    msg = (struct rt_mq_message *)mq->msg_queue_free;
    /* for non-blocking call */
    if (msg == RT_NULL && timeout == 0)
    {
        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        return -RT_EFULL;
    }

    /* message queue is full */
    while ((msg = (struct rt_mq_message *)mq->msg_queue_free) == RT_NULL)
    {
        /* reset error number in thread */
        thread->error = RT_EOK;

        /* no waiting, return timeout */
        if (timeout == 0)
        {
            /* enable interrupt */
            rt_hw_interrupt_enable(temp);

            return -RT_EFULL;
        }

        /* suspend current thread */
        _ipc_list_suspend(&(mq->suspend_sender_thread),
                            thread,
                            mq->parent.parent.flag);

        /* has waiting time, start thread timer */
        if (timeout > 0)
        {
            /* get the start tick of timer */
            tick_delta = rt_tick_get();

            RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n",
                                        thread->name));

            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        /* re-schedule */
        rt_schedule();

        /* resume from suspend state */
        if (thread->error != RT_EOK)
        {
            /* return error */
            return thread->error;
        }

        /* disable interrupt */
        temp = rt_hw_interrupt_disable();

        /* if it's not waiting forever and then re-calculate timeout tick */
        if (timeout > 0)
        {
            tick_delta = rt_tick_get() - tick_delta;
            timeout -= tick_delta;
            if (timeout < 0)
                timeout = 0;
        }
    }

    /* move free list pointer */
    mq->msg_queue_free = msg->next;

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    /* the msg is the new tailer of list, the next shall be NULL */
    msg->next = RT_NULL;
    /* copy buffer */
    rt_memcpy(msg + 1, buffer, size);

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();
    /* link msg to message queue */
    if (mq->msg_queue_tail != RT_NULL)
    {
        /* if the tail exists, */
        ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;
    }

    /* set new tail */
    mq->msg_queue_tail = msg;
    /* if the head is empty, set head */
    if (mq->msg_queue_head == RT_NULL)
        mq->msg_queue_head = msg;

    if(mq->entry < RT_MQ_ENTRY_MAX)
    {
        /* increase message entry */
        mq->entry ++;
    }
    else
    {
        rt_hw_interrupt_enable(temp); /* enable interrupt */
        return -RT_EFULL; /* value overflowed */
    }

    /* resume suspended thread */
    if (!rt_list_isempty(&mq->parent.suspend_thread))
    {
        _ipc_list_resume(&(mq->parent.suspend_thread));

        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        rt_schedule();

        return RT_EOK;
    }

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    return RT_EOK;
}
RTM_EXPORT(rt_mq_send_wait)

实际上较早一些的版本如3.0.3版本中并没有单独拎出一个rt_mq_send_wait函数,只有rt_mq_send函数,所有的实现都在其中。 

(1)在发送消息时需要传递一些参数:rt_mq_t mq是已经创建的队列句柄;void *buffer是即将发送消息的存储地址;rt_size_t size是即将发送的消息的大小。

(2)检测传递进来的参数,即使这些参数中有一个是无效的,就无法发送消息。

(3)判断消息的大小,其大小不能超过创建时设置的消息队列的大小mq->msg_size。用户可以自定义大小,如果mq->msg_size不够,可以在创建时设置得大一些。

(4)获取一个空闲链表的指针。必须有一个空闲链表节点用于存放要发送的消息,如果消息队列已经满了,则无法发送消息。

(5)移动空闲链表指针。

 类似资料: