当前位置: 首页 > 知识库问答 >
问题:

将apache kafka轮询迁移到spring kafka

周和志
2023-03-14
 while (!done.get()) {
    ConsumerRecords<byte[], <byte[]> records = kafkaConsumer.poll(<MAX_VALUE>);
    if (records.isEmpty()) {
        retryCount++;
        Thread.sleep(<some_time>);
    } else {
    // Process records;
    }
    if (retryCount > <max_retry_count>) {
    done.set(true); 
    }    
  }

尝试了以下方法:1)使用spring kafka注释(@kafkalistener),但它不允许我们控制轮询。2)创建了“ConcurrentMessageListenerContainer”,并且setupMessageListener将记录添加到队列中进行轮询。这给了我们对使用者的控制。

我想知道,我的方向是正确的吗?使用Spring-Kafka有什么更好的解决方案来达到上述要求呢?

共有1个答案

邢新
2023-03-14

不清楚你说的“对消费者的控制”是什么意思。创建容器与使用@kafkalistener相同(容器是在封面下创建的)。

Spring使用“消息驱动”的方法。

您可以设置IdleEventInterval,如果在此期间没有收到任何记录,容器将发布ListenerContainerIdleEvent。您可以使用ApplicationListenerbean或@EventListener方法侦听这些事件。

 类似资料:
  • 我正在使用OpenJDK 12中的Nashorn JS引擎。纳肖恩似乎遭到了反对。我正在寻找可用的替代方案。我找到了GraalVM,但我不确定这是不是最好的。如何从Java执行GraalVM JavaScript?你有什么例子吗? Nashorn用的是Java: 在Nashorn中,我创建了一个WrappedMongoDatabase,它扩展了AbstractJSObject。在那里,我添加了一些

  • 问题内容: 将Express.js从版本2更新到了版本3,以下调用中断了,因为它不再存在于V3中: 有一个迁移指南说: (使用中间件+ res.locals) 但是我很困惑如何做到这一点。是否有一个更具体的示例说明如何进行迁移? 相关的SO post: Node.js Express3.0 问题答案: 我有同样的问题 session.user ,只是通过了解该app.use功能必须是固定的, 在

  • 我正试图从maven迁移到gradle,但CheckStyle出现了一个奇怪的错误。 这是我得到的错误 如何让gradle使用最新版本的CheckStyle?值得注意的是,我的使用了maven checkstyle 6.10.1和6.8

  • 我要迁移一个JavaApplet通过JNLP启动,作为一个Java的Web Start应用程序,并遇到一些麻烦/误解... 我得到的资源之一是这样的:6迁移JavaAppletJavaWeb Start和JNLP: 让我们开始吧,但是: 目前,该应用程序是一个小程序(),过去是通过将小程序标记嵌入到HTML中来启动的,小程序标记指的是JNLP。 现在,由于所有浏览器都放弃了对小程序的支持,我应该将

  • Angular 是使用 TypeScript 构建的,并且支持向 Angular 提供元信息的装饰器。 TypeScript 的装饰器会让语法感觉更加“自然”,尽管有可能使用 Angular 没有的功能。