当前位置: 首页 > 知识库问答 >
问题:

solace队列上的消息是否需要按照它们在队列上的顺序被发送?

贺景铄
2023-03-14

我有从不同提供程序类提取消息的辅助线程。每个提供程序类添加/获取内部队列的消息。每个提供者只满足一个solace队列,而solace使用者将消息添加到队列的提供者。

多个工作人员可以接收提供程序的消息,处理它们,然后发送消息的ack(下面的message.commit()方法执行ack)。

情景

    null
    null

提供程序类

    public abstract class BaseProvider implements IProvider {

     private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();

    @Override
    public synchronized List<CoreMessage> getNextQueuedItem() {
        List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();
        if (internalQueue.size() > 0) {
            Logger.debug("Queue has entries");
            CoreMessage msg = null;
            try {
                msg = internalQueue.take();
            } catch (InterruptedException e) {
                Logger.warn("Interruption");
                e.printStackTrace();
            }
            if (msg != null) {
                arrMessages.add(msg);
            }
        }
        return arrMessages;
    }

    protected synchronized void addToQueue(CoreMessage message) {
        try {
            internalQueue.put(message);
        } catch (InterruptedException e) {
            Logger.error("Exception adding message to queue " + message);
        }
    }
}

//有一组工作线程可以读取这些队列

  public class Worker implements Runnable 
    @Override
    public void run() {
    Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());

    while (!stopRequested) {
        boolean processedMessage = false;
        for (IProvider provider : providers) {
            List<CoreMessage> messages = provider.getNextQueuedItem();
            if (messages == null || messages.size() != 0) {
                processedMessage = true;
                for (CoreMessage message : messages) {
                    final Message msg = createEndurMessage(provider, message);
                    processMessage(msg);
                    message.commit();
                }
            }
        }
        if (!(processedMessage || stopRequested)) {
            // this is to stop the thread from spinning when there are no messages
            try {
                Thread.sleep(WAIT_INTERVAL);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
}

共有1个答案

商绍元
2023-03-14

这似乎是围绕Solace API的自定义包装器。这使得很难为您的问题提供答案,因为我们根本不知道这个包装器在做什么。

下面的答案做出了以下假设。

>

  • 包装程序正在使用非事务处理的JCSMPSession

      null

  •  类似资料:
    • 我可以在solace JMS队列中搜索任何特定的消息,然后在其他消息之前处理吗?我们有这样的功能w. r. t慰藉队列。

    • 为什么已经拥有了共享内存时需要消息队列呢? 这将是多种原因,让我们将其分解为多个点来简化 - 据了解,一旦消息被一个进程接收到,它将不再可用于任何其他进程。 而在共享内存中,数据可供多个进程访问。 如果想使用小信息格式进行通信。 当多个进程同时进行通信时,共享内存数据需要同步保护。 使用共享内存的写入和读取频率很高,那么实现功能将会非常复杂。 在这种情况下不值得使用。 如果所有的进程不需要访问共享

    • 一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 应用解耦 三、可靠性 发送端的可靠性 接收端的可靠性 参考资料 一、消息模型 点对点 消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。 发布/订阅 消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。 发布与订阅模式和观察者模式有以下不同: 观察者模式中,观察者和主题都知道对方的存在;

    • 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 一个线程会从消息队列中收取消息,另一个线程会定时给消息队列发送普通消息和紧急消息 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: *

    • 消息队列接口 结构体 struct   rt_messagequeue   消息队列控制块 更多...   类型定义 typedef struct rt_messagequeue *  rt_mq_t   消息队列类型指针定义   函数 rt_err_t  rt_mq_init (rt_mq_t mq, const char *name, void *msgpool, rt_size_t msg_

    • rabbitmq 使用 定义handler实体 public class UserEvent : EventHandler { public string Name { get; set; } public string Job { get; set; } } 队列定义 [QueueConsumer(nameof(HelloEventHandler), QueueCon