我是Camel的新手,网上没有类似的问题让我相信我在做一些愚蠢的事情。我正在使用camel 2.12.1组件,并且正在从本地目录解析大型CSV文件,并通过SFTP下载它们。我发现
拆分(body(). Tokenize("\n")).流().散集(). csv()
适用于本地文件(windows 7);我与
列表
对于csv文件中的每一行。但是,当我从sftp组件(连接到linux服务器下载文件)使用相同的路由语法时,我会得到一个带有一行代码的单交换,读起来像是对“ls”的调用:
-rwxrwxrwx 1用户名用户名83400 12月16日14:11文件名。csv
经过反复试验,我发现
拆分(body())。流式处理()。解组()。csv()
使用sftp组件可以正确地加载和解析文件,但它在流模式下不会这样做,它会在将整个文件解组到单个exchange之前将其加载到内存中。
我从camel 2.10中发现了一个类似的错误报告(https://issues.apache.org/jira/browse/CAMEL-6231),子句将其关闭为无效,表明报告者错误地使用了线程和并行流,但我没有配置这些功能。
我使用的sftp节是:
sftp://192.168.1.1?fileName=fileName.csv&username=userName&password=secret!&idempotent=true&localWorkDirectory=tmp
文件节为:
"file:test/data?noop=true&fileName=fileName.csv"
有人知道我做错了什么吗?
我在SFTP(骆驼2.25.0)中也遇到了同样的问题。然而,在将路由拆分为两个不同的路由(如其他人所建议的)之前,我使用了下面的url
sftp://:22/?用户名=随机
使用下面的路线定义,
from("sftp url").split().tokenize("\n", 10, true).streaming().to("log:out")
由于此路由还将远程文件下载到本地(与2路由选项相同),然后使用正常流处理本地文件(作为Sinsanator,提到它与文件完美配合),因此内存占用在下载时成为真正的锯齿(高达100MB),然后它在处理时使用高达150MB,但又大致是锯齿性质。
这种方法的一个优点(在我看来)是,我们可以根据实际的处理完成情况来处理与完成相关的任务(例如,将远程文件移动到其他目录中)(如果我们中断路由,这是不可能自动完成的)。此外,由于下载由Camel管理,本地文件在处理完成后会自动删除。
做一个中间路线来解决问题。
<route id="StagingFtpFileCopy">
<from uri="ftp://{{uriFtpPath}}"/>
<to uri="file://data/staging"/>
</route>
我正在研究一个骆驼原型,它在同一骆驼环境中使用两个起点。 第一个路由使用用于“配置”应用程序的消息。消息通过配置服务bean加载到配置存储库中: 第二个路由实现了收件人列表eip模式,将不同类型的输入消息传递给许多收件人,这些收件人从同一个配置存储库中按顺序读取: 现在出现的问题是如何同步它们,因此如果第一个路由正在处理新数据,则第二个路由“等待”。 我是Apache Camel的新手,对如何处理
我试图弄清楚骆驼的节流概念。我已经看到了骆驼的航线政策,但这适用于许多飞行中的交换。 我的路线如下: 现在我的用例是,我想在这些路由之间传输比如说2000条消息,我知道可以通过来完成。但是,我不得不决定如何在下一个2000条消息被路由时控制它。我只想在接收者队列变为空时路由下2000条消息。 例如,消息从队列路由到。假设2K消息已成功路由,现在我想挂起我的路由,这样它就不会传输更多的消息,直到队列
与之间的差异: > 它们可能有不同的特点: 这里讨论的似乎是另一个毫无意义的流拆分器特性策略(并行计算似乎更好):深入理解Java8和Java9中的拆分器特性 在本例中,从禁用拆分功能的顺序流创建了一个拆分器(返回null)。当以后需要转换回一个流时,该流不会从并行处理中受益。一种耻辱。 最大的问题是:作为解决办法,在调用之前总是将流转换为并行流会有什么主要影响?
我有一个Camel/SpringBoot应用程序,它从GraphQLendpoint检索数据,将数据存储在内存数据库(2个表)中,通过运行SQL查询提取CSV文件,然后将文件上传到FTP服务器。由于将提取约350k条记录,我使用SQLs outputType=StreamList、splitter和stream:file。整个路线如下所示: 提取数据时不会出现任何问题,并使用记录创建CSV文件。但
我最近注意到Camel现在有自己的Kafka组件,所以我决定给它一个旋转。 我决定尝试一个很好的简单文件->kafka主题如下...
我在Camel中有一个路由,当异常发生时,我想重试,但是我想设置一个属性,以便该路由在第二次尝试时可以做一些稍微不同的事情,以阻止重试中再次发生错误。这里有一个路线说明了我目前正在尝试的想法。 显然这不是真正的路线;整个主体只是在某些情况下模拟我的组件错误。我希望看到以下消息被记录: 但我真正看到的是: 我尝试将添加到异常处理程序中,但这只会抑制错误消息。我没有看到第二条开始或完成日志消息。 为什