假设我有一个包含3个应用程序的流——一个源、处理器和接收器。 我需要保留从源收到的消息的顺序。当我收到消息A,B,C,D,我必须将它们作为A,B,C,D.发送到接收器(我不能将它们作为B,A,C,D)发送。 如果每个应用程序只有一个实例,那么一切都将按顺序运行,并且顺序将被保留。 如果我每个应用程序有 10 个实例,则消息 A、B、C、D 可能会在不同的实例中同时处理。我不知道这些消息的顺序是什么
我正在尝试根据 http://cloud.spring.io/spring-cloud-dataflow/ 中解释的非常简单的示例运行“Hello,world”Spring Cloud 数据流流。我能够创建一个简单的源代码和接收器,并使用 Kafka 在本地 SCDF 服务器上运行它,因此到目前为止一切都是正确的,并且在 SCDF 指定的主题中生成和使用消息。 现在,我正在尝试根据http://d
我有一个常见的任务问题,我可以找到任何解决方案或帮助(也许我需要传递一些属性来工作?)我使用本地服务器1.3.0.M2并创建简单的流 在日志中,我得到了这个: 2017-09-28 12:31:00.644 信息 5156 --- [ -C-1] o.. a.k.c.c.internals.AbstractCoordinator : 成功加入第 1 代的组测试 2017-09-28 12:31:0
对于我的使用,它创建了额外的桥接实例和另外两个 kafka 主题,这会导致数据在 so - 之间流动的延迟问题 - 创建数据流的用例 1)对于新记录流应该如此= 2)对于更新记录流程应如此= 问题: 1)这里数据将从 so - 这里数据使用4个队列从源流向接收器,与源数据流相比,延迟问题- 这将创建so和foo作为额外的队列,这将消耗我的资源和硬件。 2)这也创建了额外bridge实例,我可以实时
我使用的是带有Kafka活页夹和Avro的SpringCloudStream版本2.2.0。显然,一个不正确的记录被发布到Kafka主题中,导致所有消费者返回反序列化错误,并进行某种无限重试。 从技术上讲,应该有一种方法可以指定反序列化异常的策略。我可以找到一些不同的策略,如和,但它们适用于我在应用程序中不使用的Kafka流。如果有人能帮助我理解这里缺少什么,我将不胜感激。
我正在尝试在最新版本的 Spring Cloud 流中使用基于内容的路由。根据这份文件 - 这是我用StreamListener编写的代码 通过使用该条件,可以将消息路由到两个不同的函数。 我正试图用如下的功能接口方法来消费消息。 如何在函数中实现类似的基于内容的路由?蒂亚。 其他细节- Spring引导版本 - 2.3.12.发布 Spring云版 - Hoxton.SR11
我有一个spring boot应用程序,它有两个功能:Http请求和kafka消息处理。我希望这个应用程序在application.yml中启用的模式下运行,也就是说,如果用户只希望为http请求启用它,那么kafka就不应该被连接。 我可以使用普通的spring boot kafka插件通过使用@KafkaListener的以下属性禁用自动配置来实现这一点, autoStartup=“${mod
应用程序A写入用户对象(json)下面的Kafka主题: 应用程序 B 正在尝试使用此用户对象(用户.java驻留在应用程序 B 项目中)与以下 application.yml(多个绑定器): 下面是我的Spring Cloud Stream处理器类的外观: 但是,它没有将Message有效负载转换为“用户”类,而是不断抛出错误,未找到 父类“User事件”。 有什么想法吗?
在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。 主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1
我的Spring云流应用程序中有一个简单的Kafka生成器。当我的Spring应用程序启动时,我有一个@PostConstruct方法,它执行一些协调并尝试将事件发送给Kafka生产者。 问题是,当对账开始将enet发送到其中时,我的Kafka制作人还没有准备好,导致以下情况: org . spring framework . messaging . messagedeliveryexceptio
我有一个以Functional方法实现的Spring云流应用程序。应用程序使用来自多个Kafka主题的事件,将输入规范化为输出模式(始终是相同的模式),并发布到Kafka。我不使用Kafka流,因为不需要加入/充实/状态。 我想通过控制输入主题来允许灵活的部署:您可以从所有主题消费,也可以从单个主题消费。我的方法是为每种类型声明专用函数,并为每种函数声明专用绑定。问题是绑定器(有一个)将所有传入消
我一直在尝试从 kafka 流式传输我的 json 事件,将其展平,然后使用 Spring Cloud 流将其推送回另一个主题。 输入: 压平工艺: 仅产生: 我的问题是怎么让它变成这样 所以我可以像我所做的那样推回残缺的 JSONObject 而不是单个 JSONArray? 尽管如此,Spring Cloud Stream输出只是一个单独的事件,不适合我上面的案例,无法为Kafka生成3个事件
我正在寻找一些关于利用Spring Cloud Stream 3 . x/Kafka binder实现的Kafka主题的重放消息策略的指导- > < li> 重播特定消息[例如通过时间戳窗口]。如何为消费者组中的所有或部分消费者重置补偿? 是否可以从主题的特定分区重播[如果我们知道我们有兴趣重放的消息的分区]? 一般来说,关于消息回放的最佳实践是什么。感谢您抽出时间。
嗨,我们一直在使用旧的spring版本和kafka 1.1,并有以下依赖项 我在应用程序中有以下配置。yaml文件 我在stackoverflow中找到了链接。答案有 首先,我不确定您对SpringApplication.ext(applicationContext,())的期望是什么- 这是否意味着Kafka不知道消费者已关闭,在这种情况下没有来自消费者的通信。我认为会发生重新平衡,此外,由于k
该消费者不需要受信任的包: 这突然发生了: 我已经尝试了以下方法,但它们不起作用,因为我只得到以下错误: < li> Spring Cloud Stream版本:3.1.2,带有Kafka Streams活页夹。 使用自定义JSON serde解决方法: