我们有一个传入的kafka主题,多个基于Avro模式的消息序列化到其中。
我们需要将Avro格式的消息拆分为多个其他kafka主题,基于某个公共模式属性的值。
|------> [OUTGOING TOPIC(AVRO) - A]
[INCOMING TOPIC(AVRO)] ----->|------> [OUTGOING TOPIC(AVRO) - B]
|------> [OUTGOING TOPIC(AVRO) - C]
想了解如何实现它,同时避免在汇流平台上构建中间客户端来进行这种拆分/路由。
您可以编写一个Kafka Streams应用程序并使用branch()
:
KStream input = builder.stream("topic");
KStream[] splitStreams = input.branch(...);
splitStream[0].to("output-topic-1");
splitStream[1].to("output-topic-2");
// etc.
我正在尝试以avro格式对kafka消息进行解密我使用了以下代码:https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaResearchconsumer/kafka/reviewsconsumerco
我正在使用AVRO文件来生成和使用消息,我想知道我是否可以使用相同的AVRO文件来处理多个主题,例如,在生产者中使用不同的“name”属性,在消费者中使用特定的“name”属性。 多谢.斯特凡诺
我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?
我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,
Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr
我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??