当前位置: 首页 > 知识库问答 >
问题:

如何使一个以kstream(Kafka stream)为基础的Spring云流处理器函数及时工作?

江曦
2023-03-14

所以我有一个场景,我的云流处理器函数从一个Kafka topic-1读取消息,并将消息生成到另一个Kafka Topic-2。但是这个过程必须及时运行,比如函数应该等待5分钟,然后它应该启动(消耗n生产)1分钟,然后1分钟后再次等待5分钟。有人能帮我做这件事吗?

共有1个答案

丁学
2023-03-14

Spring Cloud Stream函数是事件驱动的,因此您无法控制消耗的时间。它将在活页夹可用时立即消费。如果这是一个困难的需求,我建议从Spring中编写一个带有schedured注释的常规bean,并使用Spring for Apache Kafka从第一个主题开始使用,然后立即生成第二个主题。您可以使用来自Sping Kafka的KafkalistenerKafkatemplate(或来自Spring Cloud Stream的Streambridge发送到Kafka)。一旦记录在第二个主题中,那么您可以使用仍然使用Kafka Streams绑定器做进一步的流处理。

 类似资料:
  • 本文向大家介绍jQuery基础的工厂函数以及定时器的经典实例分析,包括了jQuery基础的工厂函数以及定时器的经典实例分析的使用技巧和注意事项,需要的朋友参考一下 1. jQuery的基本信息: 1.1 定义: jQuery是JavaScript的程序库之一,它是JavaScript对象和实用函数的封装, 1.2 作用: 许多使用JavaScript能实现的交互特效,使用jQuery都能完美地实现

  • 顺便说一句:我的应用程序是一些REST控制器和一些批处理作业的组合。那么使用云数据流有意义吗?如果没有,那么是否有更好的控制台管理器用于批处理作业(如重新启动、取消作业门户)等?

  • 我正在努力定制我的spring kafka streams应用程序。我一直试图在我的KStreams上配置处理未捕获(运行时异常)。 参考文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_st

  • 我有一些问题。 基于类中的时间戳,我想做一个逻辑,排除在1分钟内输入N次或更多次的数据。 UserData类有一个时间戳变量。 起初我试着用一个翻滚的窗户。 但是,滚动窗口的时间计算是基于固定时间的,因此无论UserData类的时间戳如何,它都不适合。 如何处理流上窗口UserData类的时间戳基? 谢谢。 附加信息 我使用这样的代码。 我试了一些测试。150个样本数据。每个数据的时间戳增加1秒。

  • 我正在尝试使用Spring boot编写一个Kafka流处理器,但当消息产生到主题中时,它不会被调用。 主题消息有不同的类型,并且是Avro格式的。在模式注册表中使用Avro UNION注册模式。 这些是主题 application.yml我正在使用cp-all-in-one-community作为docker-file 但现在我得到以下错误:

  • 我正在学习spring cloud dataflow的概念,并想知道存储全局资源的常见方式是什么。 例如,当我有一个带有PMML处理器的流时,我希望通过Spring-Cloud-Task周期性地重新训练底层的PMML模型。 我将在哪里存储模型,以便它可以被处理器用作(只读)资源,并由任务每晚更新?Spring云数据流中是否有一个全局存储的概念?我应该使用spring-cloud之外的传统数据库,还