我有一个用例:
我需要定期阅读和汇总Kafka主题的信息,并发布到不同的主题。本地存储不是一个选项。这就是我计划解决这个问题的方式,欢迎提出任何改进建议
为了安排Kafka消息的聚合和发布,计划使用聚合器EIP的completionInterval选项。这是代码。
@Autowired ObjectMapper objectMapper;
JacksonDataFormat jacksonDataFormat;
@PostConstruct
public void initialize(){
//objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
}
路线是:
public void configure() throws Exception {
from("kafka:localhost:9092?topic=item-events" +
"&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
.routeId("kafkapoller")
.unmarshal(jacksonDataFormat)
.aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
.marshal().json(JsonLibrary.Jackson)
.to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
}
这看起来不错。要记住的事情:
Aggregate Controller
使您能够外部强制完成交换,因此您可以执行诸如向骆驼发出关闭命令,然后调用此命令来完成机上交换我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?
是否可以验证/筛选发送到Kafka主题的消息?
问题内容: 我们已经编写了一个Java客户端,用于将消息发布到kafka。代码如下所示 当我们执行此代码时,我们得到以下消息和异常 这发生在无限循环中,并且应用程序挂起…当我们检查kafka代理时,发现该主题已创建…但是我们没有收到消息…我们已经坚持了一段时间。 .. 请帮忙 问题答案: 我们终于解决了这个问题…我们在混合环境中运行kafka,如下文所述- https://medium.com/@
在我的Spring Boot Kafka应用程序中,我有以下使用者配置: 消费者: 如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。 它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactor
Kafka MQ源连接器可以将事件从MQ带到1个Kafka主题,我们可以在Kafka MQ连接器内部进行基于消息的路由吗? 还是我们必须编写一个KStream应用程序来根据内容负载进行路由
我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。