当前位置: 首页 > 知识库问答 >
问题:

将消息从一个Kafka集群流式传输到另一个

王嘉木
2023-03-14

我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个集群(远程)-

所以假设WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的PC上。我也有一个Kafka-Instance在我的本地机器上运行。
现在我想让WordCount演示在包含应该计算单词的句子的Topic(“远程”)上运行。
然而,计数应该写入我本地系统上的Topic而不是“远程”Topic。

这样的事情在Kafka-Streams API中可行吗?
例如。

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
    .groupBy((_, word) => word)
    .count("word-counts")

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)

val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()

非常感谢你-蒂姆

共有1个答案

南宫炜
2023-03-14

Kafka Streams仅为单个集群构建。

一种解决方法是使用foreach()或类似工具,实例化您自己的写入目标集群的KafkaProducer。请注意,您自己的制作人必须使用同步写入!否则,在发生故障时可能会丢失数据。因此,这不是一个非常有效的解决方案。

最好只将结果写入源集群,然后将数据复制到目标集群。注意,您很可能可以在源集群中使用更短的输出主题保留期,因为实际数据在目标集群中存储的保留时间更长。这允许您限制源集群上所需的存储

编辑(回复下面的评论来自@快速洞察)

如果您的Kafka流服务中断的时间比保留时间长怎么办

这似乎是一个正交问题,可以针对任何设计提出。保留时间应根据您的最大停机时间来设置,以避免一般情况下的数据丢失。请注意,由于应用程序在源集群中读/写,并且源集群输出主题可以用很短的保留时间进行配置,因此如果应用程序宕机,不会发生什么坏事。不会处理输入主题,也不会生成新的输出数据。您可能只担心进入目标集群的复制管道中断的情况——您应该在源集群中相应地设置输出主题的保留时间,以确保不会丢失任何数据。

它还可以加倍您对Kafka的写入。

是的。它还增加了磁盘上的存储占用空间。这是应用程序弹性和运行时性能与集群负载之间的权衡(一如既往)。您的选择。我个人建议使用上面指出的更具弹性的选项。横向扩展Kafka集群比处理应用程序代码中的所有弹性边缘情况更容易。

那看起来超级低效

这是个人判断。这是一种权衡,没有客观的对与错。

 类似资料:
  • 问题内容: 将尽可能多的字节从ByteBuffer 放入另一个ByteBuffer 的最有效方法是什么(以及知道传输了多少字节)?我正在尝试,但似乎要抛出BufferOverflowException,当我需要它们时,我现在无法从Sun获取Javadocs(网络问题)。> :( argh。 编辑:darnit,如果bbuf_src是ReadOnly缓冲区,则@Richard 的方法(使用来自bac

  • 我有两个BigQuery数据集:和 这些数据集中的每一个都包含一个表,例如和 包含流数据,我想将数据从流式传输到。 我有类型的架构。如何将流行从一个表复制到另一个表并保留现有架构? 到目前为止,我已经研究了BigQuery的insertAll方法,但是我有点不确定在哪个数据结构中获取行,以及在插入新表时如何指定TableSchema。 我希望能就如何做到这一点提供一些指导。谢谢

  • 我有一个组件A,它触发一个对话框 此组件触发PicuploadComponent,我在此上传图像并接收带有一些数据响应

  • 我正在尝试建立一个简单的 ActiveMQ 代理网络。我有2台机器,比如A和B。 在A的< code>activemq.xml文件中,我放置了networkConnector,它具有指向机器b的URI。在A的日志中,显示它连接到了b 我有一个向A发送消息的应用程序。在ActiveMQ的网络控制台的网络选项卡中,我可以看到我的消息正在排队和出队(我认为这意味着消息被转发到机器B)。 我的问题是消息在

  • 我们有一个传入的kafka主题,多个基于Avro模式的消息序列化到其中。 我们需要将Avro格式的消息拆分为多个其他kafka主题,基于某个公共模式属性的值。 想了解如何实现它,同时避免在汇流平台上构建中间客户端来进行这种拆分/路由。

  • Kafka流中是否内置了允许将单个输入流动态连接到多个输出流的功能?允许基于true/false谓词进行分支,但这不是我想要的。我希望每个传入日志都确定它将在运行时流到的主题,例如,日志将流到主题和日志将流到主题。 我可以在流中调用,然后写给Kafka制作人,但这似乎不是很好。在Streams框架中是否有更好的方法来实现这一点?