当前位置: 首页 > 知识库问答 >
问题:

带有Kafka使用者的Spring Boot作业调度程序

微生智刚
2023-03-14

我正在一个POC中工作,我想消费来自Kafka主题“用户”的消息。尝试实现一旦spring boot scheduler在预定时间或cron时间触发,消费者应该从Kafka主题读取消息,然后我们应该开始从Kafka主题逐个消费现有消息并处理这些消息,当所有的消息被消费,那么Kafka消费者应该停止。调度程序应在cron时间触发并再次启动进程。

我已经尝试了以下方法来实现这一点,尽管我正在努力确定如何从我的调度器方法schedularMsgConsumeKakfa调用consumer(字符串消息)方法,以及任何我们有更好的结构来在Spring Boot调度器中使用Kafka消息并在实现tasklet接口的类中编写Kafka消费者方法的html" target="_blank">示例,等等。

@Configuration
@Service
public class SchedularMsgConsumeKafkaController2 {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @KafkaListener(topics = "users", id = "full-part-id", containerGroup = "full-part-group", autoStartup = "false")
    public void consume(String message) throws IOException, InterruptedException {
        System.out.println(String.format("#### -> Consumed message -> %s", message));
        System.out.println(String.format("Really happy"));
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.stop();
    }

    @Scheduled(fixedDelay = 30000, initialDelay = 15000)
    public void schedularMsgConsumeKakfa() throws Exception {
        MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("full-part-id");
        listenerContainer.start();
    }

}

共有1个答案

梁丘波
2023-03-14

首先,@schedule不是Spring Boot特性。它是Spring Framework所固有的:https://docs.Spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling。

第二个:@kafkalistener是Spring for Apache Kafka项目的一部分:https://docs.Spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation。

因此,这两个特性都可以在Spring Boot Framework之外使用。我知道这与问题无关,但最好用它们的专有名称来称呼事物。

@kafkalistener&@schedule的混合不是很好的解决方案,因为您混合了许多不兼容的问题。

最好使用一些不同的解决方案,在这些方案中,您不需要处理start()/stop(),并且不会使用额外的线程来增加环境的开销。

考虑一下Spring集成及其kafkamessageSource实现:https://docs.Spring.io/spring-integration/reference/html/kafka.html#kafka-inbound-pollable。这对你来说可能是新的东西,但这是值得的,因为你不打算做太多的东西自己。

“您自己”是指使用ConsumerFactoryAPI和从@sedule手动调用KafkaConsumer.poll()

对于@schedule解决方案,可能没有这样的示例,因为它确实足够复杂。Spring Integration中没有KafkaMessageSource的示例,因为它非常简单,并且仅限于配置适当的通道适配器。

 类似资料:
  • 我们有一个使用Spring Framework在Tomcat中运行的Web应用程序。我们需要为循环操作添加一些计划作业。为此,我们遇到了Quartz Scheduler,并遵循了使用Quartz with Spring配置作业的教程,并按预期计划并运行了作业。 所以我们有一些任务是在应用程序启动时安排的。现在我们希望用户手动运行作业并更改作业的触发器,但是我们需要将这些更改持久化到数据库中。因此,

  • 问题内容: 是否有一些cron之类的库,可以让我安排某些功能在特定时间运行(例如15:30,而不是从现在开始x个小时等等)?如果没有这种库,应该如何实现呢?我是否应该将回调设置为每秒调用一次,检查时间并开始为该时间安排的作业? 问题答案: node-cron就是我所描述的

  • 我有两个要求。 每周一触发作业 对于第一个要求,我尝试了和。他们俩都不工作。该函数未被调用。 对于第二个要求,我尝试了和。我得到下面的错误。 原因:java。lang.IllegalStateException:遇到无效的@Scheduled方法“monthlyData”:用于输入字符串:“2#1”。

  • 我有一份詹金斯的工作,计划在特定的时间进行。我想以编程方式修改该计时。 我试图通过安装Schedule build插件并使用。但这将使作业处于保持java线程的静默期。我希望在不将其置于静默期的情况下修改计划条目。

  • 我正在寻找最好的解决方案,以创建一个java web应用程序,以生成Excel/PDF格式的报告。类似于Google Adwords的东西,用户可以创建日程报告,并在以后生成报告时下载。 我正在考虑开发一个java应用程序,在其中用户记录,选择一个预先定义的报告,并提供输入参数(如报告日期等),这个请求将被排队或保存为Quarts作业(首选持久队列)。一个作业将监视队列/作业并执行该作业,生成报告