当前位置: 首页 > 知识库问答 >
问题:

拆分后在Spring Cloud集成流中将所有消息放入kafka后执行方法

严易安
2023-03-14

如何在拆分后将所有消息放入Kafka后执行方法

我尝试在频道后使用句柄(),但最终没有从第2条消息发布到Kafka。

 IntegrationFlowBuilder flowBuilder = IntegrationFlows
        .from(() -> jdbcTemplate....);
 flowBuilder.split();
 flowBuilder.channel(messageChannel);

我想在将所有拆分消息写入Kafka后执行一个方法。

共有1个答案

越俊驰
2023-03-14

由于您要拆分,并且只有在发送到Kafka之后,才可以捕获所有内容,除非之后聚合。您可以使用publishSubscribeChannel向Kafka和聚合器发送相同的消息。当聚合器释放组时,您可以很好地执行方法调用

 类似资料:
  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 我有以下Flume代理配置来读取来自kafka源的消息并将它们写回HDFS接收器 如果每个轮询周期只有一条kafka消息到达,则kafka消息内容是avro数据,并且正确地序列化为文件。 当两个kafka消息到达同一批次时,它们被分组在同一个HDFS文件上,因为avro消息包含两个模式数据,结果文件包含模式数据模式数据,导致它是无效的. avro文件。 如何拆分avro事件以将不同的kafka消息

  • 我试图找出最好的方式将我的数据扇出到单独的占位符中,以供其他处理的数据使用 用例我正在接收Kafka主题中几个脚本(约2000只股票)的股票数据。我希望能够单独在所有脚本上运行KPI(KPI就像应用于输入数据以获取KPI值的公式)。 我能想到的选项 > 将所有刻度数据保存在一个主题中,并使用Custom分区器按脚本名称对其进行分区。这有助于保持低主题计数和系统易于管理。但是所有消费者都需要丢弃大量

  • 我正在开发一个Spring集成应用程序。 我有一个入站通道适配器,用于读取目录,然后是一个拆分器,用于将文件拆分为行,最后是一个udp出站通道适配器,用于发送行 我想每秒钟发一封信 我可以通过定义自己的拆分器并在每次读取一行时等待1s来做到这一点,但我想知道是否可以在xml文件中尽可能简单地完成它。 提前谢谢

  • 我在这里设置了一个最小的示例,其中有N个Kakfa主题的N个流(在下面的示例中为100个)。 我想在每个流看到“EndofStream”消息时完成它。当所有流都完成时,我希望Flink程序能够顺利完成 当parallelism设置为1时,这是正确的,但通常不会发生。 从另一个问题来看,似乎并非Kafka消费群体的所有线索都结束了。 其他人建议抛出异常。但是,程序将在第一个异常时终止,并且不会等待所

  • 我正在使用Docker启动一个kafka代理集群(例如,5个代理,每个容器一个代理)。Kafka版本2.12-0.11.0.0,动物园管理员3.4.10。 场景: null > 在独立模式下启动Zookeeper,然后启动kafka 创建主题 null 检查邮件 消息被累犯 null null server.properties(broker.id唯一,broker_ip:broker_port对