当前位置: 首页 > 知识库问答 >
问题:

暂停和恢复KafkaConsumer

颜博达
2023-03-14

我要做的是暂停< code>KafkaConsumer,如果在使用消息的过程中出现错误。

这是我写的

@KafkaListener(...)
public void consume(
 @Header(KafkaHeaders.CONSUMER) KafkaConsumer<String,String> consumer,
 @Payload String message) {
  
    try {
        //consumer message
    } catch(Exception e) {
        saveConsumer(consumer);
        consumer.pause();
    }
}

然后我写了一个REST服务来恢复消费者

@RestController
@RequestMapping("/consumer")
class ConsumerRestController {
    @PostMapping("/resume")
    public void resume() {
        KafkaConsumer<String,String> consumer = getConsumer();
        if(consumer != null) {
            consumer.resume(consumer.paused());
        }
    }
}

现在,我有两个问题。第一个问题:当我打电话给消费者时。来自<code>@KafkaListener</code>注释方法的pause()会发生什么?消费者立即暂停,或者我可以接收到同一主题分区的其他偏移量上的其他消息。例如,我有偏移量为3的“message1”和偏移量为4的“mesmage2”,“message 1”会导致异常,“mesage2”会发生什么?它到底被消耗了吗?

第二个问题:从REST服务中恢复消费者会给出一个ConprestModificationException,因为Kafka消费者不是线程安全的。那么,我为什么要这样做呢?

共有1个答案

艾骏喆
2023-03-14

不要直接让消费者停顿;请改为暂停容器。

@KafkaListener(id = "foo", ...)
@Autowired KafkaListenerEndpointRegistry;

...

registry.getListenerContainer("foo").pause();

暂停将在下一次轮询之前生效;如果您想立即暂停(并且不处理上次轮询的剩余记录),请在暂停后抛出一个异常(假设您正在使用现在默认的SeekToCurrentErrorHandler

 类似资料:
  • 问题内容: 我有一个基本的Swing UI,带有一个标记为“播放”的按钮。按下按钮后,标签变为“暂停”。现在,当按下按钮时,它变为“继续”。 在“播放”中,我将实例化并执行一个SwingWorker。我想要的是能够暂停该线程(不要取消该线程),并根据上述按钮按下来恢复它。但是,我不想在doInBackground()中求助于Thread.sleep()。这似乎有点骇人听闻。有什么方法可以阻止运行d

  • 问题内容: 我在viewDidAppear中有一个图像,并用以下代码对其进行了动画处理: 我想在点击时暂停动画,如果再次点击它则继续播放动画。 问题答案: 暂停和恢复动画的2个功能,我从这里开始转换为Swift。 我有一个按钮可以暂停或恢复在中初始化的动画:

  • 嗨,我在camel中有一个JMS消费者路由,我的要求是在特定事件时停止/暂停该路由(基于某个字段值),然后使用调度器恢复该路由。为此,我创建了两个路由,一个是我的原始jms消费者路由,另一个是调度程序路由,它们恢复jms消费者路由,虽然我能够暂停路由,但第二个路由不恢复暂停的路由,它显示的状态为已启动。 以下是我的两条路线 请帮助我如何实现上述场景。

  • 有些情况下,例如爬取大的站点,我们希望能暂停爬取,之后再恢复运行。 Scrapy通过如下工具支持这个功能: 一个把调度请求保存在磁盘的调度器 一个把访问请求保存在磁盘的副本过滤器[duplicates filter] 一个能持续保持爬虫状态(键/值对)的扩展 Job 路径 要启用持久化支持,你只需要通过 JOBDIR 设置 job directory 选项。这个路径将会存储 所有的请求数据来保持一

  • 问题内容: 我正在开发游戏,我想创建一个暂停菜单。这是我的代码: 但 仍在运行… 我想在玩家单击暂停菜单时暂停计时器,并在玩家返回游戏时继续运行计时器,但是我如何暂停?请帮帮我。 问题答案: 您需要使其无效并重新创建。然后,如果您使用相同的按钮暂停和恢复计时器,则可以使用bool来跟踪状态:

  • 问题内容: 我注意到,有很多主题是有关使用暂停/恢复MP3的,因此为了帮助所有人,我专门为此设计了整个课堂!请参阅下面的答案。 注意:这是供我个人使用的,因此它可能不如某些人希望的那样健壮。但是由于其简单性,进行简单的修改并不难。 问题答案: 播放器的一个非常简单的实现,实际上是暂停播放。它通过使用单独的线程播放流并告诉播放器线程是否/何时暂停和继续工作来工作。