当前位置: 首页 > 知识库问答 >
问题:

如何提交偏移线程安全使用骆驼Kafka?

何和惬
2023-03-14

如前所述,如何使用camel kafka手动控制偏移提交?我想使用camel kafka手动提交偏移量。我的路线:

.from(kafka:topic1)
 .aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
 .process(new ManualCommitProcessor())

,其中ManualCommitProcessor将在将消息发送到另一个主题后进行promise。

问题是,聚合器和Kafka制作者在Kafka消费者负责抵消promise的独立线程中工作。因此,我以

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

聚合并分派提交偏移量后,是否有可能再次调用使用者线程

共有1个答案

和柏
2023-03-14

不,这是不可能的,使用者线程独立于聚合器的输出运行。

 类似资料:
  • 有一条骆驼路线,它被处理,异常由处理。代码如下所示: 它是有效的,并且在错误被处理后,事务有一个提交:,但是然后消息再次被放入输入队列中,事务从头开始(无尽循环)。 我们正在使用IBM MQ,我认为这种回滚是由MQ执行的,而不是从camel路线执行的。所以问题是:我能告诉MQ我处理了异常,并且它不应该再次回滚吗?

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我正在寻找一些关于如何使用骆驼路线实现以下模式的想法: 客户端通过HTTP restendpoint调用web服务(originalService) 服务接受服务的主体,并将其作为作业提交给另一个web服务(jobService)。此辅助服务返回作业id。 使用作业id,originalService轮询jobService的时间不超过x秒。Originalservice将把提交的服务的结果返回给

  • 我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。 是否可以手动控制何时可以执行提交?

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 我设置了一个endpoint,如下所示: 我用骆驼石英2.22.2和石英1.8.6 下面是从关闭到启动上下文的日志。