当前位置: 首页 > 面试题库 >

如何消费一条消息?

萧飞
2023-03-14
问题内容

通过Rabbitmq中的示例,消费者可以一次从队列中获取所有消息。如何使用一条消息并退出?

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  System.out.println(" [x] Received '" + message + "'");
}

问题答案:

您必须声明basicQos设置,才能一次从ACK到NACK状态获取一条消息,并禁用自动ACK以便显式给出确认。

ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.basicQos(1);
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    System.out.println("[*] waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, consumer);
    while(true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        int n = channel.queueDeclarePassive(QUEUE_NAME).getMessageCount();
        System.out.println(n);
        if(delivery != null) {
            byte[] bs = delivery.getBody();
            System.out.println(new String(bs));
            //String message= new String(delivery.getBody());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            //System.out.println("[x] Received '"+message);
        }
    }

希望能帮助到你!



 类似资料:
  • 我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • 我是一个新的学习者,试图理解拉雷维尔的拉比MQ。我已找到驱动程序vyuldashev/laravel队列rabbitmq 我已经配置应用程序/queue.php,并运行驱动程序与此语法"php工匠队列:工作Rabbitmq"。控制器。我不会在我的控制器中调度作业,因为laravel只是监听消息并处理消息。谁能帮我解释一下这是怎么回事?谢啦

  • 我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认