当前位置: 首页 > 知识库问答 >
问题:

阿帕奇骆驼:轮询后访问路由

罗翰
2023-03-14

我使用Camel JPAendpoint来轮询数据库,并将数据复制到第二个数据库。为了不重复轮询,我打算保存复制数据的最高ID,并且只轮询ID高于该ID的数据。

为了节省一些数据库写入,我希望在当前轮询/复制运行结束后写回最高ID,而不是针对每个数据元素。我可以访问骆驼路线类中的元素(及其ID):

    private Long startId = 0L;
    private Long lastId = 0L;

    from("jpa://Data").routeId("dataRoute")
        .onCompletion().onCompleteOnly().process(ex -> {
            if (lastId > startId) {
                startId = lastId;
                logger.info("New highest ID: {}", startId);
            }
        }).end()
        .process(ex -> {
            Data data = ex.getIn().getBody(Data.class);
            lastId = data.getId();
            NewData newData = (NewData) convertData(data);
            ex.getMessage().setBody(newData);
        }).to("jpa://NewData")

现在我想在当前轮询结束后保存startId。为此,我用自己的自定义方法重写了PollingConsumerPollStrategy,在这里我想访问commit方法中的lastId(在当前轮询完成后,它会在我想要的时候执行)。

但是,我无法访问那里的路由。我尝试通过路由ID:

    @Override
    public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
        var route = (MyRoute) endpoint.getCamelContext().getRoute("dataRoute");
        var lastId = route.getLastId();
        log.debug("LastID: {}", lastId);
    }

但是,我收到了一个类强制转换异常:默认路由MyRoad。是关于将ID交给我的路由吗?

共有1个答案

鲁龙野
2023-03-14

我会做得有点不同。

除了使用 RouteBuilder 实例变量来存储 startIdlastId 之外,您还可以将这些值作为当前 CamelContext 的 GlobalOptions(基本上是键值对的映射)放置。

通过这种方式,您可以通过以下方式轻松获取它们的价值:

public void commit(Consumer consumer, Endpoint endpoint, int polledMessages) {
    String lastId = endpoint.getCamelContext().getGlobalOption​("lastId"); 
}

它(理论上)也是一个更好的实现,因为它还支持潜在的并发执行,因为 id 是为上下文中运行的所有实例共享的。

 类似资料:
  • 我试图使用Apache Camel Quartz2实现一个调度器,它每分钟执行一次路由,并按预期执行一些任务。我使用spring DSL实现与apache camel相关联的路由,如下所示: 根据日志,它不会记录为路由记录的消息,例如Direct:DomainsWithFTPUsers等等。请指导如何实现同样的目标。

  • 遵循官方文件(https://camel.apache.org/manual/component-dsl.html#_using_component_dsl)我创建了以下代码: 但是中的告诉我: 并且中的特性不建议导入相应的库。 有人能给我指出正确的方向吗? 我必须理解的概念才能做到这一点吗?

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • 考虑到apache Camel,我有一个问题:是否可以通过代码来创建全局拦截器,例如AOP?拦截器应该跳过endpoint还是模仿endpoint? 提前致谢

  • 下面是我试图实现的场景: 谢谢你的帮助。此外,这里是已经工作的FTP部分。

  • 我有一个spring boot应用程序,我正在向其中添加一个camel路由。定义路由的类扩展了FatJarRouter,并用@component注释。当应用程序作为spring boot应用程序运行时,不会识别路由。但是,如果我在主类中使用@SpringBootApplication注释编写路由,则会标识路由。这是它目前在日志中的显示方式: 请告诉我如何在将路由作为一个单独的类而不是在主类中编写时