当前位置: 首页 > 知识库问答 >
问题:

ftp使用者流下载seda队列在拆分器tariterator期间失败

屠君墨
2023-03-14

首先介绍一下我的要求:

  • 从多个动态定义的只读FTP/SFTP站点下载大型tar.gz文件。
  • 根据条目名的扩展名处理。tar.gz中的文件。
  • 使用Camel 2.19.3
from("ftp://user@localhost/path?download=false&inProgressRepository=#inProgressRepo&idempotentRepository=#idemRepo&noop=true&readLock=changed&readLockMarkerFile=false&autoCreate=false&stepwise=false").to("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")
from("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")
.process({
    String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME_ONLY, String.class);
    CamelContext context = exchange.getContext();
    ConsumerTemplate downloadConsumer = context.createConsumerTemplate();
    Producer unpackProducer = context.getRoute("unpack").getEndpoint().createProducer();
    Map<String,Object> parms = new HashMap<>();
    parms.put("fileName", fileName);
    parms.put("runLoggingLevel", "INFO");
    parms.put("consumer.bridgeErrorHandler", "true");
    parms.put("idempotentRepository", "#idemRepo");
    parms.put("noop", "true");
    parms.put("readLock", "changed");
    parms.put("readLockLoggingLevel", "INFO");
    parms.put("readLockMarkerFile", "false");
    parms.put("initialDelay", "0");
    parms.put("autoCreate", "false");
    parms.put("maximumReconnectAttempts", "0");
    parms.put("streamDownload", "true");
    parms.put("stepwise", "false");
    parms.put("throwExceptionOnConnectFailed", "true");
    parms.put("useList", "false");
    downloadConsumer.start();
    Exchange downloadExchange = downloadConsumer.receive(URISupport.normalizeUri(URISupport.appendParametersToURI("ftp://user@localhost/path", parms));
    unpackProducer.process(downloadExchange);
    if (downloadExchange.isFailed()) {
      LOGGER.error("unpack failed", downloadExchange.getException());
      exchange.setException(downloadExchange.getException());
    }
    downloadConsumer.doneUoW(downloadExchange);
    downloadConsumer.stop();
}
from("direct:unpack").routeId("unpack")
.convertBodyTo(InputStream.class, null)
  .split(new TarSplitter()).streaming()
    .choice()
      .when(header(FILE_NAME).regex(XML_FILTER))
        .unmarshal().jacksonxml(POJO.class)
        .endChoice()
      .when(header(FILE_NAME).regex(XML2_FILTER))
        .unmarshal().jacksonxml(POJO2.class)
        .endChoice()
      .end()
    .end()
  .to("file://...")

提前感谢您的帮助。

共有1个答案

白光耀
2023-03-14

我的错误,需要添加binary=true。

 类似资料:
  • 我无法连接到具有SEDA队列的骆驼路线。在服务器端,我有以下配置: 我正在尝试从这样的独立客户端访问此路由: 但我的制作人无法连接到seda队列。无法按我的路线排队。无法在我的bean属性中添加camelContext。我正在获取“bean类的属性'camelContext'无效”。如果我将正文发送到SEDA队列,则消息将发送到那里,但不会发送到路由的下一个元素

  • 我们有一个用例,在这个用例中,基于到达工作队列的工作项,我们需要使用消息元数据来决定数据流来自哪个Kafka主题。我们部署的工作节点可能少于100个,每个工作节点可以有可配置数量的线程从队列接收消息。因此,如果一个工人有“N”个线程,我们会打开Kafka流到“N”个不同的主题。(n通常小于10)。一旦worker完成了对消息的处理,我们还需要关闭流。工作人员可以接收下一个消息,一旦它的第一个消息,

  • 与之间的差异: > 它们可能有不同的特点: 这里讨论的似乎是另一个毫无意义的流拆分器特性策略(并行计算似乎更好):深入理解Java8和Java9中的拆分器特性 在本例中,从禁用拆分功能的顺序流创建了一个拆分器(返回null)。当以后需要转换回一个流时,该流不会从并行处理中受益。一种耻辱。 最大的问题是:作为解决办法,在调用之前总是将流转换为并行流会有什么主要影响?

  • 我正在开发一个使用Apache Camel和JMX活动的小应用程序。非常简单地说,我有一个使用SEDA组件的路由--只有一个消费者--简而言之,它创建自己的线程,并在路由繁忙时对传入的交换进行排队。 我想知道Camel中是否有一些现成的东西允许我这样做,或者我忽略了Hawtio或JConsole中的一些东西。 提前谢了。

  • 我使用RabbitMQ网络创建了一个主题交换UITX并绑定到交换两个队列TX. Q1和TX. Q2,每个队列都相应地绑定了路由密钥rk1和rk2,并向交换产生了少量消息。 现在,我想使用Spring Cloud Stream创建一个消费者,它只从Q1获取消息。我尝试使用配置: 以及使用消息的方法的注释。 因此,我可以看到使用者创建了一个同名TX.Q1的队列(或绑定),但新队列/绑定的路由键是# 如

  • 我有一种生产者-消费者设置,其中生产者(不同线程上的多个生产者)将数据排入redis队列,消费者(单个线程上的单个消费者)监视该队列。当队列长度达到时,例如 使用redis-py客户端,我使用以下代码提取前10000项,并删除它们: (用于lrange和ltrim的文件) 我的问题是,这里有数据丢失的机会吗?例如,在调用函数ltrim()和实际修剪队列之间的时间(在这种情况下,最新的日志将丢失,因