当前位置: 首页 > 知识库问答 >
问题:

使用apache camel的camel-kafka组件手动提交消费者补偿

邵修诚
2023-03-14

我能够使用ApacheKafka提交偏移量类,并能够使用ConsumerConnector进行提交。我查看了apache camel kafka组件,该组件的使用者选项与“auto.commit.enable”属性相同。现在,Camel Java DSL中是否有任何属性或方法,在使用消息后,我们可以手动提交偏移量(通过URL中提供的方法或消费者选项),或者我们必须再次使用Kafka消费者API提交消费者偏移量?

共有1个答案

端木狐若
2023-03-14

您可以使用KafkaManualCommit提交

请检查文件https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/docs/kafka-component.adoc#using-与Kafka消费者手动提交

public void process(Exchange exchange) {
    KafkaManualCommit manual =
        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
    manual.commitSync();
}
 类似资料:
  • 本文向大家介绍python kafka 多线程消费者&手动提交实例,包括了python kafka 多线程消费者&手动提交实例的使用技巧和注意事项,需要的朋友参考一下 官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html 以上这篇python kafka 多线程消费者&手动提交实例就是小编分享给大家

  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

  • 我创建了两个apache camel(blueprint XML)kafka项目,一个是kafka-producer(接受请求并将其存储在kafka服务器中),另一个是kafka-consumer(从kafka服务器获取ups消息并处理它们)。 这个设置对单个主题和单个消费者都很有效。然而,我如何在同一个Kafka主题中创建单独的消费者组?如何在不同的消费者群体中路由同一主题中的多个消费者特定消息

  • 我指的是Kafka源代码连接器的Flink 1.14版本,代码如下。 我期待以下要求。 在最新的应用程序开始时,必须阅读Kafka主题的最新偏移量 在检查点上,它必须将消耗的偏移量提交给Kafka 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中选取,并且必须消耗使用者延迟,然后再使用新的事件提要 有了Flink新的KafkaConsumer API(KafkaSource

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • Kafka新手。 Kafka版本:2.3.1 我正在尝试使用Spring cloud使用来自两个主题的Kafka消息。除了kafka活页夹和下面的一些简单配置之外,我没有做太多配置。每当(组协调器lbbb111a.uat.pncint.net:9092(id:2147483641机架:null)不可用或无效时,将尝试重新发现)发生时,已经处理的一堆消息会再次被处理。不确定发生了什么。