当前位置: 首页 > 知识库问答 >
问题:

Kafka流:一个记录到多个记录

盖雪峰
2023-03-14

给定:我在Kafka中有两个主题,假设主题A和主题B。Kafka流从主题A中读取一条记录,对其进行处理,并产生与所消耗记录相对应的多条记录(假设recordA和recordB)。现在的问题是我如何使用Kafka流来实现这一点。

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()

在这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。

共有1个答案

子车文康
2023-03-14

我不确定我是否正确理解了这个问题,我也不明白@Abhishek的答案:(

如果您有一个输入流,并且希望每个输入记录获得零条、一条或多条输出记录,那么您可以应用FlatMap()FlatMapValues()(取决于您是否希望修改键)。

您还在询问“如何将此列表划分为两个制作人流?”如果要将一个流拆分为多个流,可以使用branch()

有关更多细节,请参阅文档:https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations

 类似资料:
  • 问题内容: Go语言中有没有办法记录到不同级别的多个输出? 我希望有一个程序可以同时在Info级别记录到stdout并在带有时间戳的调试级别记录一个文件。 就像我每次编写代码一样: 我可以看到控制台打印: 和一个文件: 我使用logrus和glog,但是找不到此功能。还有其他包装或我可以编码的东西吗? 问题答案: Go-logging支持不同的日志记录后端,例如文件,syslog等。可以设置多个后

  • 我有一个主题,它接收可能包含部分数据的JSON记录。我想合并这些数据,所以我试图在最终的数据记录内收集尽可能多的信息。 合并记录值后所需的流: 编辑:流定义是正确的,默认情况下reduce不会向下游发送所有消息,而是在这样做之前缓存它们。要禁用此行为,配置属性: 必须设置。

  • 我希望我的Spring批处理应用程序一次从数据库中读取50条记录,然后将这50条记录发送给处理器,然后发送给写入器。 有人可以告诉我如何做到这一点吗? 我尝试使用JdbcPagingItemReader并将pageSize设置为50,这样可以读取50条记录,但是rowMapper、处理器和编写器一次接收一条记录,而不是获得50条记录。 如何使处理器和写入器在dto中获得50条记录,而不是一次接收一

  • ,日志记录将进入一个文件; (路径)/service_name/service_name.log 我想用logback复制这种行为,但在logback.xml配置中获取“logger”名称时遇到了真正的困难。它可以在log encoder.pattern中看到,即“%d%-5level%logger{35}-%msg%n”。

  • 在一个使用log4j和slf4j的应用程序中,我尝试使用依赖于log4j2的elasticsearch jar。 应用程序的日志依赖项如下所示- 应用程序使用自己的<code>log4j。xml并具有一些log4j的自定义附加器,因此如果不重写附加器,则无法将其迁移到log4j2。 根据elasticsearch文档中的建议,添加了以下依赖项,以使用不同于log4j2的记录器。 但是现在在启动应用

  • 我正在用Java编写一个REST服务(在Tomcat 9上使用JAX-RS标准),我想向我的程序添加日志,以便在调用时跟踪他的工作。 因此,我使用Log4j(版本1.2.17)来编写日志,我想为在REST服务上运行的每个服务编写不同的日志文件,但我只需要为整个应用程序使用一个Log4j属性 所以我想在我的日志文件夹中保存这种日志 所以我把它写成Log4j(restServiceLogger.pro