当前位置: 首页 > 知识库问答 >
问题:

Camel幂等使用者对removeonfailure=true的不正确行为

匡旭东
2023-03-14

我想知道以下行为是否是骆驼幂等消费者的预期行为:

对于路由,我有removeonfailure=true,这基本上意味着当exchange失败时,幂等使用者应该从存储库中删除标识符。这带来了一个非常有趣的场景,它允许在Exchange上复制。

from("direct:Route-DeDupeCheck").routeId("Route-DeDupeCheck")
            .log(LoggingLevel.DEBUG, "~~~~~~~ Reached to Route-DeDupeCheck: ${property.xref}")
            .idempotentConsumer(simple("${property.xref}"), MemoryIdempotentRepository.memoryIdempotentRepository()) //TODO: To replace with Redis DB for caching
            .removeOnFailure(true)
            .skipDuplicate(false)
            .filter(exchangeProperty(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
                .log("~~~~~~~ Duplicate Message Found!")
                .to("amq:queue:{{jms.duplicateQueue}}?exchangePattern=InOnly") //TODO: To send this to Duplicate JMS Queue
                .throwException(new AZBizException("409", "Duplicate Message!"));

共有1个答案

尉迟德惠
2023-03-14

你的基本前提是错的。

下一次尝试使用相同的标识符即12345失败,因为它被捕获为重复消息(CamelDuplicateMessage)

当有重复的消息时,不认为是失败。它只是在进一步处理中被忽略(除非将skipduplicate选项设置为true)。

public void configure() throws Exception {
        //getContext().setTracing(true); Use this to enable tracing

        from("direct:abc")
            .idempotentConsumer(header("myid"),
                MemoryIdempotentRepository.memoryIdempotentRepository(200))
            .removeOnFailure(true)
            .log("Recieved id : ${header.myid}");
    }
}
@EndpointInject(uri = "direct:abc")
ProducerTemplate producerTemplate;

for(int i=0, i<5,i++) {
   producerTemplate.sendBodyAndHeader("somebody","myid", "1");
}
INFO 18768 --- [tp1402599109-31] route1   : Recieved id : 1

就一次。

 类似资料:
  • 我有一个物理服务器,其中运行着两个Apache Camel实例a。两者都包含相同的集成工件。任务是将来自服务器文件系统的文件与由cron作业触发的Apache Camel的文件组件集成。文件处理应该只发生一次。因此,我对同一服务器上的Jdbc存储库使用幂等模式。 总的来说,它工作得很好。但有时,在少数情况下,两个实例都运行并读取文件,因此文件被处理两次。在JDBC数据库中,键也被插入两次(甚至创建

  • 我正在使用Apache Camel2.13.1轮询一个数据库表,其中将有300k行以上。我希望使用幂等使用者EIP来过滤已经处理过的行。 不过,我想知道这个实现是否真的是可伸缩的。我的骆驼上下文是:- 在1908988是request.body.id的情况下,我已经将EIP设置为键上,所以这并不容易合并到我的查询中。 是否有更好的方法将CAMEL_MESSAGEPROCESSED表用作select

  • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?

  • 我在应用程序中使用了Kafka 1.0.1,我已经开始使用0.11中引入的幂等生产者功能,在使用幂等生产者功能时,我很难理解排序保证。 我的生产者的配置是: 重试50次 根据文件: 重试 设置一个大于零的值将导致客户端重新发送任何记录,如果该记录的发送失败,可能会出现暂时性错误。请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。允许在不设置最大值的情况下重试。航班请求。每连接到1可能会改

  • 我有一个Spring-boot应用程序,可以听Kafka。为了避免重复处理,我尝试手动提交。为此,我在阅读主题后异步提交了一条消息。但是我被困在如何实现消费者幂等,这样记录就不会被处理两次。

  • 我的应用程序有5个以上的消费者在使用一个kafka主题的5个分区。(使用kafka版本11)我的消费者每个都产生一个消息到另一个主题,然后保存一些状态到数据库,然后做一个手动立即确认,并移动到下一个消息。 我试图解决当他们向出站主题发出成功时的场景。那么我们就会失败/失去消费者。当另一个使用者接管分区时,它将向出站主题发出另一条消息。这很糟糕:( 我发现Kafka现在有了幂等制作人,但从我读到的内