当前位置: 首页 > 知识库问答 >
问题:

ApacheCamel:GB数据从数据库路由到JMSendpoint

谷梁德容
2023-03-14

我现在已经在camel中做了一些小项目,但有一件事我很难理解,那就是当在camel路线上消费时,如何处理大数据(这不适合内存)。

我有一个包含几GB数据的数据库,我想使用骆驼进行路由。显然,将所有数据读入内存不是一种选择。

如果我是作为一个独立的应用程序这样做的,我会有代码来分页数据并将块发送到我的JMS enpoint。我想使用骆驼,因为它提供了一个很好的模式。如果我从文件中消费,我可以使用流()调用。

我也应该使用 camel-sql/camel-jdbc/camel-jpa 还是使用 bean 从我的数据库中读取。

希望大家还和我在一起。我更熟悉JavaDSL,但希望人们能提供任何帮助/建议。

更新日期:2012年5月2日

所以我有一些时间来解决这个问题,我认为我实际上正在做的是滥用生产者的概念,以便我可以在路线中使用它。

public class MyCustomRouteBuilder extends RouteBuilder {

    public void configure(){
         from("timer:foo?period=60s").to("mycustomcomponent:TEST");

         from("direct:msg").process(new Processor() {
               public void process(Exchange ex) throws Exception{
                   System.out.println("Receiving value" : + ex.getIn().getBody() );
               }
         }
    }

}

我的生产者看起来像下面这样。为了清楚起见,我没有包括CustomEndpoint或CustomComponent,因为它看起来只是一个薄薄的包装器。

public class MyCustomProducer extends DefaultProducer{ 

    Endpoint e;
    CamelContext c;

    public MyCustomProducer(Endpoint epoint){
          super(endpoint)   
          this.e = epoint;
          this.c = e.getCamelContext();
    }

    public void process(Exchange ex) throws Exceptions{

        Endpoint directEndpoint = c.getEndpoint("direct:msg");
        ProducerTemplate t = new DefaultProducerTemplate(c);

        // Simulate streaming operation / chunking of BIG data.
        for (int i=0; i <20 ; i++){
           t.start();
           String s ="Value " + i ;                  
           t.sendBody(directEndpoint, value)
           t.stop();         
        }
    }
} 

首先,上面看起来不太干净。执行此操作的最干净的方法似乎是通过调度石英作业填充jms队列(代替direct:msg),然后我的骆驼路由使用该作业,这样我就可以在骆驼管道中接收到的消息大小上具有更大的灵活性。然而,我非常喜欢将基于时间的激活设置为Route的一部分的语义。

有没有人对这样做的最佳方式有什么想法。

共有1个答案

陈坚
2023-03-14

在我看来,你需要做的就是:

from("jpa:SomeEntity" + 
    "?consumer.query=select e from SomeEntity e where e.processed = false" +
    "&maximumResults=150" +
    "&consumeDelete=false")
.to("jms:queue:entities");

maxumResults 定义每个查询获取的实体数的限制。

当你完成一个实体实例的处理时,需要设置< code > e.processed = true并< code>persist()它,这样实体就不会被再次处理。

一种方法是使用@Consumed注释:

class SomeEntity {
    @Consumed
    public void markAsProcessed() {
        setProcessed(true);
    }
}

您需要注意的另一件事是在将实体发送到队列之前如何序列化实体。您可能需要在 from 和 to 之间使用丰富器。

 类似资料:
  • 问题内容: 我一直在使用手动数据库选择来处理具有两个独立数据库的项目。我已经在设置中定义了数据库。 进一步阅读之后,看来数据库路由实际上是解决问题的方法。但是,在阅读了文档和此处的一些相关文章之后,我比以往更加困惑。 在我的设置中,我有: 我知道我必须像这样定义路由器类(我认为在文件中): 那呢 每个模型都需要一个还是自动的?除此之外,我仍然得到一个错误: django.core.exceptio

  • 本文向大家介绍Django 添加数据库路由文件,包括了Django 添加数据库路由文件的使用技巧和注意事项,需要的朋友参考一下 示例 要在Django中使用多个数据库,只需在中指定每个数据库settings.py: 使用dbrouters.py文件来指定对于每种数据库操作类,哪些模型应在哪些数据库上进行操作,例如,对于存储在中的远程数据remote_data,您可能需要以下内容: 最后,将您添加d

  • 问题内容: 嗨,我已经成功地将jTable链接到JDBC数据库。但是,我在检索它们时遇到了麻烦。我希望在重新启动程序时出现保存的数据,但是它不起作用。 这是我保存文档的代码! 有什么方法可以检索JDBC数据库中的数据并通过Jtable显示它?我很抱歉提出这样一个简单的问题,但是我是Java新手,我非常需要帮助! 非常感谢! 用于加载数据的代码… 顺便说一句,我的jtable是一个3列的表,其中包含

  • 我使用 liquibase 变更集创建了一个 Postgresql 数据库 A。现在,我正在创建一个应用程序,该应用程序允许创建新的数据库 B 并从数据库 A 实时复制架构,包括 liquibase 变更集,因为数据库以后仍然可以更新。请注意,在数据库 A 中复制的架构时,可能已经更新,从而使基本变更集过时。 我的主要问题是: 如何使用liquibase将PostgreSQL模式x从数据库a(在运

  • 问题内容: 我有这个小点击计数器。我想将每次单击都包含在mysql表中。有人可以帮忙吗? 万一有人想看看我做了什么: 这是phpfile.php,出于测试目的,将数据写入txt文件 问题答案: 您的问题中定义的JavaScript不能直接与MySql一起使用。这是因为它不在同一台计算机上运行。 JavaScript在客户端(在浏览器中)运行,并且数据库通常在服务器端存在。您可能需要使用中间服务器端

  • 我有一个独立的H2服务器,正在收集数据。为了进行测试,我希望将数据从服务器中提取到CSV文件中。有什么工具吗?