我有一个usecase,其中传入的数据有一个标识不同类型数据的键。有一个单一的输入Kafka主题,所有类型的数据都抛向它。beam管道从输入的kafka主题中读取所有消息,并必须根据关键字路由到不同的kafka主题。
final class AutoValue_KafkaIO_Write<K, V> extends Write<K, V> {
private final String topic;
private final WriteRecords<K, V> writeRecordsTransform;
private AutoValue_KafkaIO_Write(@Nullable String topic, WriteRecords<K, V> writeRecordsTransform) {
this.topic = topic;
this.writeRecordsTransform = writeRecordsTransform;
}
如何使用apache Beam的kafkaIO生产者?
经过几天的努力,实现了消息路由,目前,kafkaIO不支持消息路由到不同的主题。
一个解决办法是为每个不同的主题创建一个kafka生产者,并将元素隔离到不同的pcollections,这取决于哪个元素发送到不同的kafka主题。
Kafka MQ源连接器可以将事件从MQ带到1个Kafka主题,我们可以在Kafka MQ连接器内部进行基于消息的路由吗? 还是我们必须编写一个KStream应用程序来根据内容负载进行路由
我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那
我有两个Kafka制作人向具有多个分区的同一主题发送消息。 正如预期的那样,来自同一生产者PR1的具有相同密钥K1的消息总是转到同一分区PA1。 问题是来自另一个生产者PR2的具有相同密钥K1的消息转到另一个分区PA2,而我希望它们也转到PA1。 Kafka不是在制片人之间保留分区分配吗? 是否与两个生产者使用不同的Kafka客户端库有关? 如果我设置两个制作人使用相同的id,会有帮助吗?
在我的Spring Boot Kafka应用程序中,我有以下使用者配置: 消费者: 如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。 它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactor
我最近开始使用消息队列(使用ActiveMQ),并进行了试验。 null 谢谢你的建议,
我的问题是,如何做到这一点?我如何定义和配置一个生产者和消费者,可以处理不同的avro实体到一个共同的主题? 谢谢你的帮助!