当前位置: 首页 > 知识库问答 >
问题:

Kafka的溪流:把一个话题引入另一个话题

邬飞捷
2023-03-14

我是新的Kafka流,我正在使用它使一个主题的确切拷贝到另一个不同的名称。本主题有几个分区,我的制作者正在使用自定义分区。输出主题是用输入主题相同数量的分区预先创建的。

在我的应用程序中,我做了(我正在使用Kotlin):

val builder = StreamsBuilder()
builder
    .stream<Any, Any>(inputTopic)
    .to(outputTopic)

除了分区(当然,我使用的是自定义分区器)之外,这是可行的。有没有一种简单的方法可以使用输入记录的相同分区将输入记录复制到输出主题?

显然,我可以在接收器中使用自定义分区器,但这将意味着反序列化和序列化记录,以便使用自定义分区器重新计算输出分区。

共有1个答案

籍利
2023-03-14

produced(它是KStream::To参数之一)将StreamPartitioner作为其成员之一。

您可以尝试以下代码:

java prettyprint-override">builder.stream("input", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
  .to("output", Produced.with(Serdes.ByteArray(), Serdes.ByteArray(), (topicName, key, value, numberOfPartitions) -> calculatePartition(topicName, key, value, numberOfPartitions));

在上面的代码中,只使用bytearrayserdes,因此会发生任何特殊的序列化或反序列化。

 类似资料:
  • 我看了Kafka的文件,还不知道如何消费一个话题平行? 假设:我有一个像“发生了一些事情”这样的话题(不要拆分这个话题),我有很多想消费它的客户。那么该怎么办,让多个客户并行消费呢?我应该使用分区和客户群吗?

  • 假设我有多个设备。每个设备都有不同类型的传感器。现在我要把每个传感器的每个设备的数据发送给Kafka。但我对Kafka的主题感到困惑。用于处理此实时数据 null 情况2:向一个主题发送数据 设备1(传感器A,B,C),设备2(传感器A,B,C)...设备....->主题 > 这不是数据瓶颈吗。因为它将表现为队列,来自某个传感器的数据将在队列中落后,并且不会被实时处理。 设备1 ->传感器A-TO

  • 我正在使用spring boot构建一个web应用程序,现在我需要接收实时通知。我正计划使用apache kafka作为这方面的消息代理。要求用户具有不同的角色,并且根据角色,他们应该接收其他用户正在执行的操作的通知。 我设置了一个生产者和消费者,作为消费者,我可以接收发布到一个主题的信息,比如说topic1。 我遇到的问题是,我可以让多个用户收听同一个主题,而每个用户都应该得到发布到该主题的消息

  • 我试图写一个Kafka消费者从一开始就消费这些信息。我可以从控制台消费者开始使用同样的方法 但是我在JAVA API中找不到相应的属性。 还有一个问题是什么应该是价值。Avro消息的反序列化程序?

  • 假设我有一个Kafka主题,大约有10个分区,我知道每个消费群体应该有10个消费者在任何给定的时间阅读该主题,以实现最大的平行性。 然而,我想知道,对于一个主题在任何给定时间点可以处理的消费者群体的数量,是否也有任何直接规则。(我最近在一次采访中被问及这一点)。据我所知,这取决于代理的配置,以便在任何给定的时间点可以处理多少个连接。 然而,我只是想知道在给定的时间点可以扩展多少个最大消费群体(每个

  • 我想知道简单主题和分区主题之间的区别是什么。根据我的理解,为了平衡负载,主题已经被分区,每个消息都有偏移,使用者将确认以确保先前的消息已经被确认。如果没有分区和使用者不匹配,由kafka完成的重新平衡会有效地管理。 如果创建多个主题而不是分区,是否会影响操作效率。