当前位置: 首页 > 知识库问答 >
问题:

将一个Kafka输入流动态连接到多个输出流

鲜于星波
2023-03-14

Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?kstream.branch允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志{“date”:“2017-01-01”}将流到主题topic-2017-01-01和日志{“date”:“2017-01-02”}将流到主题topic-2017-01-02

我可以在流中调用foreach,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?

共有1个答案

骆利
2023-03-14

如果您希望根据您的数据动态创建主题,那么目前在Kafka的流API(V0.10.2及更早版本)中没有任何支持。您需要创建一个KafkaProducer并自行实现动态“路由”(例如使用KStream#foreach()KStream#process())。请注意,您需要执行同步写操作以避免数据丢失(不幸的是,这不是很好的性能)。有计划使用动态主题路由扩展流式API,但目前还没有具体的时间表。

还有一个你应该考虑的因素。如果您事先不知道目标主题,而只是依赖所谓的“主题自动创建”功能,则应确保使用所需的配置设置(例如,分区数或复制因子)创建这些主题。

作为“主题自动创建”的替代方法,您还可以使用Admin Client(v0.10.1)创建具有正确配置的主题。参见https://cwiki.apache.org/confluence/display/kafka/kip-4+-+命令+行+和+集中式+管理+操作

 类似资料:
  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

  • 交互式应用程序通常要分别用类 istream 和 ostream 输入和输出数据。当提示信息出现在屏幕上时,用户输入一个数据来响应。显然,提示信息必须在执行输入操作前出现。在有输出缓冲区的情况下,只有在缓冲区已满时、在程序中明确地刷新输出缓冲区时或因程序结束而自动刷新输出缓冲区时,输出信息才会显示到屏幕上。为保证输出要在下一个输入前显示,C++ 提供了成员函数tie,该函数可以实现输入/输出操作的

  • 您好,我正在学习服务器-客户端应用程序的示例,我不明白客户端如何从服务器接收字符串。 服务器运行执行以下操作的线程: 在客户端类中有以下代码: 和和在线程和客户端类中以相同的方式实现,包括: 其中,也是同一个对象,它是客户机的socket 我的问题是为什么字符串正是字符串?服务器在输出流中写入,客户端从输入流中拾取,这不是两个不同的内存区域吗?

  • 我有一个输入流,我想将其写入HttpServletResponse。有一种方法,由于使用字节[],需要花费的时间太长 我想知道在速度和效率方面,什么可能是最好的方法。

  • 我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个集群(远程)- 所以假设WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的PC上。我也有一个Kafka-Instance在我的本地机器上运行。 现在我想让WordCount演示在包含应该计算单词的句子的Topic(“远程”)上运行。 然而,计数应该写入我本地系统上的Topic而不是“远程”T

  • 问题内容: 如果运行命令,则会得到多行输出。您如何将所有行连接为一行,有效地将每行替换为(以空格结尾)? 对我不起作用。 问题答案: 使用翻译所有换行符为空格: 注意:读取文件,连接文件。不要啦! 编辑: 只能处理单字符翻译。您可以使用更改输出记录分隔符,例如: 这将改变: 至: