当前位置: 首页 > 知识库问答 >
问题:

使用Apache Camel同步FTP remote和FS(或HDFS)

顾高扬
2023-03-14

有时,我想手动运行一个工具,将FTP远程与我的FS(至少,最终-HDFS)远程同步。每个文件都应该下载,更新的文件应该追加。

我使用Apache Camel FTP2示例:https://github.com/Apache/Camel/tree/master/examples/camel-example-FTP启动该项目。在其他来源的帮助下,这是一个简单的解决方案。

object Main{

  def main(args: Array[String]): Unit = {
    val context = new DefaultCamelContext
    context.addRoutes(FtpRoute())

    context.start
    Thread.sleep(100000)
    context.stop
  }    
}

case class FtpRoute() extends RouteBuilder {

  def configure(): Unit = { // configure properties component
    val pc = getContext.getComponent("properties", classOf[PropertiesComponent])
    pc.setLocation("classpath:ftp.properties")

    val ftpSource = getContext.resolvePropertyPlaceholders(
      s"ftp://{{ftp.serv}}{{ftp.path}}?username={{ftp.user}}&password={{ftp.pass}}&passiveMode=true")

    from(ftpSource)
      .to("file:/tmp/ftp")
      .log("Downloaded file ${file:name} complete.")
  }
}

我可以说这很管用,但是...绝对不是我想要的。

    null

多谢!

共有1个答案

东方俊明
2023-03-14

ftp组件正在轮询使用者。

这样的组件每延迟一次就轮询一个URI。

每个轮询,组件从第一个到最后接收所有新文件。这就是为什么你会看到文件不断的重新下载。下载完所有文件后,组件将停止,并将等待计时器或计划发出的新事件。

    null
disconnect=true&
connectTimeout=300000&
ftpClient.dataTimeout=300000&
timeout=300000&soTimeout=300000&
delay=3600000&reconnectDelay=10000&
backoffErrorThreshold=1&
maximumReconnectAttempts=3&
backoffMultiplier=2&
passiveMode=true&
noop=true&
idempotent=true&
idempotentRepository=#jdbcDocFileIdempotentRepository&
download=true&
recursive=true&
stepwise=true&
antInclude=**/currMonth/*.zip&antExclude=**/prevMonth/*.zip&
idempotentKey=${file:onlyname}-${file:size}
<bean id="jdbcDocFileIdempotentRepository"
      class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
    <argument ref="embeddedDataSource"/>
    <argument value="DocFileRepo"/>
    <property name="tableExistsString" value="SELECT 1 FROM MESSAGE_REPOSITORY WHERE 1 = 0"/>
    <property name="createString" value="CREATE TABLE MESSAGE_REPOSITORY (processorName TEXT, messageId TEXT, createdAt TIMESTAMP, result TEXT)"/>
    <property name="queryString" value="SELECT COUNT(*) FROM MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"/>
    <property name="insertString" value="INSERT INTO MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)"/>
    <property name="deleteString" value="DELETE FROM MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"/>
</bean>

数据源:

<bean id="embeddedDataSource" class="org.sqlite.javax.SQLiteConnectionPoolDataSource">
   <property name="url" value="jdbc:sqlite:${embedded_db_config_path}" />
   <property name="encoding" value="UTF-8"/>
   <property name="journalMode" value="WAL"/>
</bean>

已更新

根据Camel文档:

 类似资料:
  • 问题内容: 我已经开始学习Java的并发和线程。我知道同步的基础知识(即它的作用)。从概念上讲,我知道它提供了对Java中具有多个线程的共享资源的互斥访问。但是,当面对下面的示例时,让我感到困惑的是同步它是否是一个好主意。我知道代码的关键部分应该同步,并且不应过度使用此关键字,否则会影响性能。 问题答案: 假设每个线程通过 不同的数组, 则不需要同步,因为其余变量是局部的。 相反,如果您触发所有调

  • 我意识到这是一个基本问题,但我没能在别处找到答案。 是

  • TLDR版本:我的应用程序如何知道设备a上的CoreData对象、设备B上的CoreData对象和CloudKit中的CKRecord都是相同的记录? 详细版本: 我正在开发一个应用程序,它在本地使用CoreData和CloudKit来实现设备间的同步。我不明白添加多个设备后,CoreData关系和CloudKit引用应该如何协同工作。 2)当您创建CloudKit记录时,您可以为它分配一个记录名

  • 我正在尝试向异步路由发送消息,但它不起作用。我刚刚在github上创建了一个项目来模拟这个问题

  • 根据某些参数,我希望在运行时决定GET请求应该同步还是异步处理。在这两种情况下,资源endpoint的URL()必须相同。这可能吗? 当然,正如您在上面的示例中所看到的,可以通过调用以异步方式伪造同步版本。但是,我会避免创建异步响应的开销。

  • 我的项目即将启动。在发布之后,我有一个很大的计划,数据库结构将发生变化——现有表和新表中的新列,以及与现有模型和新模型的新关联。 我还没有接触到序列化中的迁移,因为我只有测试数据,我不介意每次数据库更改时都删除这些数据。 为此,目前,如果我更改了模型定义,我会在我的应用程序启动时运行。这将删除所有表并从头开始创建它们。我可以省略选项,让它只创建新表。但如果现有的改变了,这是没有用的。 那么,一旦我