当前位置: 首页 > 知识库问答 >
问题:

Apache Camel Kafka-聚合Kafka消息并定期发布到不同的主题

别锐
2023-03-14

我有一个用例:

我需要定期阅读和汇总Kafka主题的信息,并发布到不同的主题。本地存储不是一个选项。这就是我计划解决这个问题的方式,欢迎提出任何改进建议

为了安排Kafka消息的聚合和发布,计划使用聚合器EIP的completionInterval选项。这是代码。

  @Autowired ObjectMapper objectMapper;
  JacksonDataFormat jacksonDataFormat;

  @PostConstruct
  public void initialize(){
    //objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
    jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
  }

路线是:

public void configure() throws Exception {
    from("kafka:localhost:9092?topic=item-events" +
            "&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
            .routeId("kafkapoller")
            .unmarshal(jacksonDataFormat)
            .aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
  }

共有1个答案

鞠建安
2023-03-14

这看起来不错。要记住的事情:

  • 如果/当JVM在聚合周期中途死亡时会发生什么?不要在意,然后冷静,否则您可能需要研究PeristentAggregationRepository来存储/重放消息,尽管您可以重放从kafka丢失的消息(这将是我最大的操作问题)
  • 接下来,考虑运行时控制。骆驼是一个令人震惊的人,因为它没有非常清楚地告诉你运行时发生了什么。聚合器(即非常贪婪的正则表达式)中的失控方法之类的东西会让你对聚合交换的当前状态知之甚少,JMX可能不会告诉你太多正在发生的事情。
  • 我将使用Aggregate Controller使您能够外部强制完成交换,因此您可以执行诸如向骆驼发出关闭命令,然后调用此命令来完成机上交换
 类似资料:
  • 我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?

  • 是否可以验证/筛选发送到Kafka主题的消息?

  • 问题内容: 我们已经编写了一个Java客户端,用于将消息发布到kafka。代码如下所示 当我们执行此代码时,我们得到以下消息和异常 这发生在无限循环中,并且应用程序挂起…当我们检查kafka代理时,发现该主题已创建…但是我们没有收到消息…我们已经坚持了一段时间。 .. 请帮忙 问题答案: 我们终于解决了这个问题…我们在混合环境中运行kafka,如下文所述- https://medium.com/@

  • 在我的Spring Boot Kafka应用程序中,我有以下使用者配置: 消费者: 如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。 它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactor

  • Kafka MQ源连接器可以将事件从MQ带到1个Kafka主题,我们可以在Kafka MQ连接器内部进行基于消息的路由吗? 还是我们必须编写一个KStream应用程序来根据内容负载进行路由

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。