当前位置: 首页 > 知识库问答 >
问题:

Apache Camel的幂等消费者模式可伸缩吗?

陈宏胜
2023-03-14

我正在使用Apache Camel2.13.1轮询一个数据库表,其中将有300k行以上。我希望使用幂等使用者EIP来过滤已经处理过的行。

不过,我想知道这个实现是否真的是可伸缩的。我的骆驼上下文是:-

<camelContext xmlns="http://camel.apache.org/schema/spring">
        <route id="main">
        <from
            uri="sql:select * from transactions?dataSource=myDataSource&amp;consumer.delay=10000&amp;consumer.useIterator=true" />
        <transacted ref="PROPAGATION_REQUIRED" />
        <enrich uri="direct:invokeIdempotentTransactions" />                
        <!-- Any processors here will be executed on all messages -->
    </route>

    <route id="idempotentTransactions">
        <from uri="direct:invokeIdempotentTransactions" />
        <idempotentConsumer
            messageIdRepositoryRef="jdbcIdempotentRepository">
            <ognl>#{request.body.ID}</ognl>
            <!-- Anything here will only be executed for non-duplicates -->
            <log message="non-duplicate" />
            <to uri="stream:out" />
        </idempotentConsumer>
    </route>            
</camelContext>
 {1908988=null}

在1908988是request.body.id的情况下,我已经将EIP设置为键上,所以这并不容易合并到我的查询中。

是否有更好的方法将CAMEL_MESSAGEPROCESSED表用作select语句的反馈循环,以便SQL server执行大部分负载?

更新:

<el>${in.body.ID}</el>
select * from transactions tr where tr.ID IN (select cmp.messageid from CAMEL_MESSAGEPROCESSED cmp where cmp.processor = 'transactionProcessor')

共有1个答案

狄望
2023-03-14

是的,是的。但是您需要使用可伸缩存储来保存已处理的消息集。您可以使用Hazelcast-http://camel.apache.org/Hazelcast-idempotent-repository-tutorial.html或Infinispan-http://java.dzone.com/articles/clustered-idempotent-consumer-这取决于您的堆栈中已经有哪个解决方案。当然,JDBC存储库可以工作,但前提是它满足所选的性能标准。

 类似资料:
  • 我有一个Spring-boot应用程序,可以听Kafka。为了避免重复处理,我尝试手动提交。为此,我在阅读主题后异步提交了一条消息。但是我被困在如何实现消费者幂等,这样记录就不会被处理两次。

  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?