当前位置: 首页 > 知识库问答 >
问题:

Apache Beam KafkaIO生产者将不同的消息路由到不同的主题

汲丰茂
2023-03-14

我有一个usecase,其中传入的数据有一个标识不同类型数据的键。有一个单一的输入Kafka主题,所有类型的数据都抛向它。beam管道从输入的kafka主题中读取所有消息,并必须根据关键字路由到不同的kafka主题。

final class AutoValue_KafkaIO_Write<K, V> extends Write<K, V> {
    private final String topic;
    private final WriteRecords<K, V> writeRecordsTransform;

    private AutoValue_KafkaIO_Write(@Nullable String topic, WriteRecords<K, V> writeRecordsTransform) {
        this.topic = topic;
        this.writeRecordsTransform = writeRecordsTransform;
    }

如何使用apache Beam的kafkaIO生产者?

共有1个答案

咸昀
2023-03-14

经过几天的努力,实现了消息路由,目前,kafkaIO不支持消息路由到不同的主题。

一个解决办法是为每个不同的主题创建一个kafka生产者,并将元素隔离到不同的pcollections,这取决于哪个元素发送到不同的kafka主题。

 类似资料:
  • Kafka MQ源连接器可以将事件从MQ带到1个Kafka主题,我们可以在Kafka MQ连接器内部进行基于消息的路由吗? 还是我们必须编写一个KStream应用程序来根据内容负载进行路由

  • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那

  • 我有两个Kafka制作人向具有多个分区的同一主题发送消息。 正如预期的那样,来自同一生产者PR1的具有相同密钥K1的消息总是转到同一分区PA1。 问题是来自另一个生产者PR2的具有相同密钥K1的消息转到另一个分区PA2,而我希望它们也转到PA1。 Kafka不是在制片人之间保留分区分配吗? 是否与两个生产者使用不同的Kafka客户端库有关? 如果我设置两个制作人使用相同的id,会有帮助吗?

  • 在我的Spring Boot Kafka应用程序中,我有以下使用者配置: 消费者: 如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。 它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactor

  • 我最近开始使用消息队列(使用ActiveMQ),并进行了试验。 null 谢谢你的建议,

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?