在RabbitMQ级别上是否有任何机制允许我使用下一个消息直到上一个消息被加密为止?还是必须在服务器之间开发某种锁定机制?
我只是引用了rabbitmq文档中的几个要点,我认为这些要点能够满足您的要求。
通常,连接到队列的活动使用者以循环方式从队列接收消息。在使用使用者优先级时,如果存在多个具有相同高优先级的活动使用者,则会循环传递消息。
正如您所提到的,您将在一台机器上拥有一个消费者,这就更容易了。
只需设置每个使用者的使用者预取限制1。因此服务器在要求确认之前只向使用者传递一条消息。并在消息完全处理后发送基本ack。
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(1); // Per consumer limit
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("received message");
// process the message .. time consuming
// after processing send the basic ack so that next message can be received from queue
channel.basicAck(envelope.getDeliveryTag(), false);
};
channel.basicConsume("my-queue", false, consumer);
希望这能帮上忙。
channel.basicQos(x);
channel.basicQos(x, true);
限制x适用于整个信道,而不是信道上的单个/每个消费者。
在您的情况下,每个频道上只有一个消费者。所以渠道限制实际上对你的案例没有任何影响。
更多更新-
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?
我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(
我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码
我有一个场景,其中可执行文件是生产者,WCF服务是消费者。 WCF服务工作流程如下: 1) 服务调用可执行文件(producer),该可执行文件是另一个将消息生成RabbitMQ队列的进程。 2) 服务必须使用来自RabbitMQ队列的消息 3)将数据返回给客户端。 到目前为止,服务能够调用可执行文件并在队列中生成消息。 但服务从第2步开始失败,它将返回null而不是实际消息。有人能告诉我这里缺少
我已经用MassTransit实现了一个简单的发布者/使用者集,我想让使用者从同一个队列中读取消息。但是,当我运行它时,我看到很大一部分消息被发送到错误队列,而不是被消耗。从我看到的讨论(所以,论坛)来看,对于RabbitMQ来说,这应该非常非常简单(只需指向相同的队列),但它并不起作用。是否有应该设置的附加配置? 这是我的出版商 还有我的消费者