当前位置: 首页 > 知识库问答 >
问题:

有没有写Kafka主题的Spring时间表的例子?

耿炎彬
2023-03-14

我们正在尝试在指定的窗口时间从Kafka读取数据(所以我们有Kafka消费者),这意味着避免在其他时间读取数据。但是,我们不确定如何在时间段到期后关闭消费者。我想知道是否有任何如何做到这一点的示例?非常感谢您对我们的帮助。

共有2个答案

蔚学真
2023-03-14

我有相同的用例,但是我编写了调度程序,指定一个批处理的最大轮询记录,并保存了一个计数器。如果计数器匹配最大轮询记录,那么我认为该批处理已经完成,因为它已经处理了在一次轮询中获得的记录。

然后,我取消订阅该主题并关闭消费者。下次调度程序运行时,它将再次处理指定的最大轮询记录限制。

fixedDelayString实现的目的是,一旦前一个完成,调度程序在指定的时间限制后启动。

@EnableScheduling
public class MessageScheduler{

        @Scheduled(initialDelayString = "${fixedInitialDelay.in.milliseconds}", fixedDelayString = "${fixedDelay.in.milliseconds}")
        public void run(){

            /*write your kafka consumer here with manual commit*/

            /*once your batch is finished processing unsubcribe and close the consumer*/
                kafkaConsumer.unsubscribe();
                kafkaConsumer.close();
        }

}
翟兴邦
2023-03-14

您可以禁用自动启动,然后使用 Kafka接收器endpoint注册表启动和停止方法手动启动 kafka 容器,@KafkaListener生命周期管理

public class KafkaConsumer {

 @Autowired
 private KafkaListenerEndpointRegistry registry;

  @KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
  public void listen(...) { ... }

  @Schedule(cron = "")
  public void scheduledMethod() {

   registry.start();

   registry.stop()
  }

但在上述方法中,不能保证来自 kafka 的所有消息都将在该时间范围内被使用(这取决于加载和处理速度)

 类似资料:
  • 我在本地计算机上将一条太大的消息推送到Kafka消息主题中,现在我收到一个错误: 增加在这里并不理想,因为我实际上不想接受那么大的消息。

  • 我正在尝试使用Spring Boot,并创建一个jar,并将其安装到我的maven存储库中。 这是一个库jar文件,将在我的主应用程序中用作依赖项,它也是一个Spring-Boot应用程序。现在,我只是在做一个hello world示例。这是我在这个项目中的一个类: 我的POM基本上是这样的: 但是当我尝试构建时,我得到了“找不到主类”错误。这对我来说非常有意义,但是我如何让spring制作一个库

  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml

  • 你能给我一些关于表的主键操作在Oracle中具有时间有效性的一些看法吗? 我创建了一个具有以下架构的表 是因为Oracle实际上并不关心主键上的有效期列吗? 提前道谢!

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行