当前位置: 首页 > 知识库问答 >
问题:

骆驼 - 流:文件和 FTP 上传

弘浩瀚
2023-03-14

我有一个Camel/SpringBoot应用程序,它从GraphQLendpoint检索数据,将数据存储在内存数据库(2个表)中,通过运行SQL查询提取CSV文件,然后将文件上传到FTP服务器。由于将提取约350k条记录,我使用SQLs outputType=StreamList、splitter和stream:file。整个路线如下所示:

from("direct:loadComplete").id("loadComplete")
    .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Load complete")
    .setHeader("createDate", simple("${date:now:yyyyMMdd'_'HHmmss}"))
    .setHeader(Exchange.FILE_NAME,simple("${header.createDate}_sku_{{account.countryCode}}.csv"))
    .to("sql:"+ QUERY + "?outputType=StreamList") //run the SQL query
    .split(body()).streaming() //stream & split the results
        .bean(LineProcessor.class, "processLine") //process each entry and transform to CSV
        .toD("stream:file?fileName={{setting.outputDir}}/${header.CamelFileName}&closeOnDone=true") // stream each record to file
    .end()
    .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Written file to disk:${header.CamelFileName}")
    .to("ftp://{{ftp.host}}:{{ftp.port}}/{{ftp.path}}?fileName={{setting.outputDir}}/${header.CamelFileName}&username={{ftp.username}}&password={{ftp.password}}&{{ftp.config}}") // upload the CSV file
    .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - FTP upload complete. File: ${header.CamelFileName}");

提取数据时不会出现任何问题,并使用记录创建CSV文件。但是,上传到FTP服务器失败,原因是:org.apache.camale。InvalidPayloadException:没有类型为java.io的可用正文。InputStream,但具有值:org.apache.camel.component.sql。ResultSetIterator我相信这是由于文件流。一旦流:文件完成并在“closeOnDone”上有后续路径,是否有任何方法执行ftp上传?

另一方面,我对outputType=StreamList在后台的实际工作方式很感兴趣。我假设它仍然将完整的SQL结果加载到内存中?有没有其他方法可以最大限度地减少应用程序的内存消耗,并且不需要传递大量的有效负载?有没有其他更优雅的方法来使用CSV编组?

谢谢你的帮助!

霍尔格

共有1个答案

孟翰藻
2023-03-14

在流处理中,将创建的 csv 行上传到本地文件。在 “.end()” 之后,正文将包含 sql 生产者的结果。您应该从FTP生产者的本地文件中读取数据。

from("direct:loadComplete").id("loadComplete")
            .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Load complete")
            .setHeader("createDate", simple("${date:now:yyyyMMdd'_'HHmmss}"))
            .setHeader(Exchange.FILE_NAME,simple("${header.createDate}_sku_{{account.countryCode}}.csv"))
            .to("sql:"+ QUERY + "?outputType=StreamList") //run the SQL query
            .split(body()).streaming() //stream & split the results
            .bean(LineProcessor.class, "processLine") //process each entry and transform to CSV
            .toD("stream:file?fileName={{setting.outputDir}}/${header.CamelFileName}&closeOnDone=true") // stream each record to file
            .end()
            .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - Written file to disk:${header.CamelFileName}")
            .pollEnrich("file:{{setting.outputDir}}?fileName=${header.CamelFileName}&noop=true", 10000/*investigate about timing according your file size*/, new AggregationStrategy() {
                @Override
                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                    return newExchange;// save important data from old exchange
                }
            })
            .to("ftp://{{ftp.host}}:{{ftp.port}}/{{ftp.path}}?fileName={{setting.outputDir}}/${header.CamelFileName}&username={{ftp.username}}&password={{ftp.password}}&{{ftp.config}}") // upload the CSV file
            .log("${date:now:yyyy-MM-dd'T'HH:mm:ssZ} - FTP upload complete. File: ${header.CamelFileName}");
 类似资料:
  • 我试图弄清楚骆驼的节流概念。我已经看到了骆驼的航线政策,但这适用于许多飞行中的交换。 我的路线如下: 现在我的用例是,我想在这些路由之间传输比如说2000条消息,我知道可以通过来完成。但是,我不得不决定如何在下一个2000条消息被路由时控制它。我只想在接收者队列变为空时路由下2000条消息。 例如,消息从队列路由到。假设2K消息已成功路由,现在我想挂起我的路由,这样它就不会传输更多的消息,直到队列

  • 我正在尝试使用多部分/表单数据将文件上传到骆驼路由。一切都很好,但是,我无法获得原始文件名。骆驼版本是:3.14.1 更新 使用对路由的以下修改进行更新。我设法处理二进制文件(获取文件名并存储它们)。但是,对于文本文件,该文件将附加边界页脚: 路线定义: 先谢谢你了 爱德华

  • 我希望上传一个包含一个文件和一些json的多部分文件,并使用camel rest dsl将该文件输出到本地文件夹。 我在路由中使用处理器,使用HttpServletRequest将多部分请求拆分为多个部分。getPart(),但我收到以下错误: 我已经向 servlet 添加了一个多部分筛选器,并且已经尝试了使用和不使用多部分解决方案 Bean。当我调试处理器时,我可以看到 HttpServlet

  • 然后我需要做的是创建另一个FTP连接--技术上是到同一台机器,但路径不同。在我的实验中,我使用了一个带有构造URI的使用者模板来获取另一个文件(基于轮询文件的内容)。 这已经在一个高级别工作,并获取我需要的文件。谁能证实这是不是一件危险的事? 根据文档: 当然,我想要的文件确实会被检索到,而且我可以将它进一步传递到骆驼路由中,然而,当我处理FTP流文件时,我看到了字节级处理(按位计算等)的问题,我

  • 我刚刚开始研究apache camel(使用蓝图路线),我已经被卡住了。 我需要处理一组不同格式的csv文件。我有5个文件,foo_X_X指定csv文件的类型,文件有日期戳。这些文件可能很大,所以一旦写入所有文件,就会写入一个“完成”文件。完成的文件名为foo_trigger_20160110.csv. 我在文件中看到了doneFileName选项,但它只支持静态名称(我在文件名中有一个日期),或

  • 我正在使用骆驼beanio组件对文件内部的数据进行封送和解封。