当前位置: 首页 > 知识库问答 >
问题:

Springboot Kafka @Listener消费者暂停/恢复不起作用

司徒宏远
2023-03-14

我有一个Spring靴Kafka消费者

为了避免重新平衡,我尝试在KafkaContainer上调用pause()和resume(),但消费者总是在运行

@KafkaListener(id = "c1", topics = "${app.topics.topic1}", containerFactory = "listenerContainerFactory1")
public void poll(ConsumerRecord<String, String> record, Acknowledgment ack) {
    log.info("Received Message by consumer of topic1: " + value);
    String result = process(record.value());
    producer.sendMessage(result + " topic2");
    log.info("Message sent from " + topicIn + " to " + topicOut);
    ack.acknowledge();
    log.info("Offset committed by consumer 1");
}

private String process(String value) {
    try {
        pauseConsumer();
        // Perform time intensive network IO operations
        resumeConsumer();
    } catch (InterruptedException e) {
        log.error(e.getMessage());
    }
    return value;
}

private void pauseConsumer() throws InterruptedException {
    if (registry.getListenerContainer("c1").isRunning()) {
        log.info("Attempting to pause consumer");
        Objects.requireNonNull(registry.getListenerContainer("c1")).pause();
        Thread.sleep(5000);
        log.info("kafkalistener container state - " + registry.getListenerContainer("c1").isRunning());
    }
}

private void resumeConsumer() throws InterruptedException {
    if (registry.getListenerContainer("c1").isContainerPaused() || registry.getListenerContainer("c1").isPauseRequested()) {
        log.info("Attempting to resume consumer");
        Objects.requireNonNull(registry.getListenerContainer("c1")).resume();
        Thread.sleep(5000);
        log.info("kafkalistener container state - " + registry.getListenerContainer("c1").isRunning());
    }
}

我错过了什么吗?有人能指导我如何正确地达到要求的行为吗?

共有2个答案

公西马鲁
2023-03-14

您正在侦听器线程上运行 process() 方法,因此暂停/恢复不会产生任何影响;仅当侦听器线程退出侦听器方法(并且在处理了上一次轮询收到的所有记录之后)才会暂停。

将于本月晚些时候发布的下一个版本(2.9)有一个新属性< code>pauseImmediate,它使暂停在处理完当前记录后生效。

吕峰
2023-03-14

你可以这样试试。这对我有用

public class kafkaConsumer {
    public void run(String topicName) {
        try {
            Consumer<String, String> consumer = new KafkaConsumer<>(config);
            consumer.subscribe(Collections.singleton(topicName));
                while (true) {
                    try {
                        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(80000));
                        for (TopicPartition partition : consumerRecords.partitions()) {
                            List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                            for (ConsumerRecord<String, String> record : partitionRecords) {

                                kafkaEvent = record.value();

                                consumer.pause(consumer.assignment());

                /** Implement Your Business Logic Here **/

                Once your processing done

                consumer.resume(consumer.assignment());

                                try {
                                    consumer.commitSync();
                                } catch (CommitFailedException e) {
                                }
                            }
                        }
                    } catch (Exception e) {
                        continue;
                    }
                }

        } catch (Exception e) {
        }
    }
 类似资料:
  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。

  • 问题内容: 我有一个基本的Swing UI,带有一个标记为“播放”的按钮。按下按钮后,标签变为“暂停”。现在,当按下按钮时,它变为“继续”。 在“播放”中,我将实例化并执行一个SwingWorker。我想要的是能够暂停该线程(不要取消该线程),并根据上述按钮按下来恢复它。但是,我不想在doInBackground()中求助于Thread.sleep()。这似乎有点骇人听闻。有什么方法可以阻止运行d

  • 我要做的是暂停< code>KafkaConsumer,如果在使用消息的过程中出现错误。 这是我写的 然后我写了一个REST服务来恢复消费者 现在,我有两个问题。第一个问题:当我打电话给消费者时。来自<code>@KafkaListener</code>注释方法的pause()会发生什么?消费者立即暂停,或者我可以接收到同一主题分区的其他偏移量上的其他消息。例如,我有偏移量为3的“message1

  • 有些情况下,例如爬取大的站点,我们希望能暂停爬取,之后再恢复运行。 Scrapy通过如下工具支持这个功能: 一个把调度请求保存在磁盘的调度器 一个把访问请求保存在磁盘的副本过滤器[duplicates filter] 一个能持续保持爬虫状态(键/值对)的扩展 Job 路径 要启用持久化支持,你只需要通过 JOBDIR 设置 job directory 选项。这个路径将会存储 所有的请求数据来保持一

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认