我们正在尝试在指定的窗口时间从Kafka读取数据(所以我们有Kafka消费者),这意味着避免在其他时间读取数据。但是,我们不确定如何在时间段到期后关闭消费者。我想知道是否有任何如何做到这一点的示例?非常感谢您对我们的帮助。
我有相同的用例,但是我编写了调度程序,指定一个批处理的最大轮询记录,并保存了一个计数器。如果计数器匹配最大轮询记录,那么我认为该批处理已经完成,因为它已经处理了在一次轮询中获得的记录。
然后,我取消订阅该主题并关闭消费者。下次调度程序运行时,它将再次处理指定的最大轮询记录限制。
fixedDelayString实现的目的是,一旦前一个完成,调度程序在指定的时间限制后启动。
@EnableScheduling
public class MessageScheduler{
@Scheduled(initialDelayString = "${fixedInitialDelay.in.milliseconds}", fixedDelayString = "${fixedDelay.in.milliseconds}")
public void run(){
/*write your kafka consumer here with manual commit*/
/*once your batch is finished processing unsubcribe and close the consumer*/
kafkaConsumer.unsubscribe();
kafkaConsumer.close();
}
}
您可以禁用自动启动,
然后使用 Kafka接收器endpoint注册表启动和停止
方法手动启动
kafka 容器
,@KafkaListener生命周期管理
public class KafkaConsumer {
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Schedule(cron = "")
public void scheduledMethod() {
registry.start();
registry.stop()
}
但在上述方法中,不能保证来自 kafka 的所有消息都将在该时间范围内被使用(这取决于加载和处理速度)
我在本地计算机上将一条太大的消息推送到Kafka消息主题中,现在我收到一个错误: 增加在这里并不理想,因为我实际上不想接受那么大的消息。
我正在尝试使用Spring Boot,并创建一个jar,并将其安装到我的maven存储库中。 这是一个库jar文件,将在我的主应用程序中用作依赖项,它也是一个Spring-Boot应用程序。现在,我只是在做一个hello world示例。这是我在这个项目中的一个类: 我的POM基本上是这样的: 但是当我尝试构建时,我得到了“找不到主类”错误。这对我来说非常有意义,但是我如何让spring制作一个库
我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml
你能给我一些关于表的主键操作在Oracle中具有时间有效性的一些看法吗? 我创建了一个具有以下架构的表 是因为Oracle实际上并不关心主键上的有效期列吗? 提前道谢!
我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到
我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行