首先介绍一下我的要求:
from("ftp://user@localhost/path?download=false&inProgressRepository=#inProgressRepo&idempotentRepository=#idemRepo&noop=true&readLock=changed&readLockMarkerFile=false&autoCreate=false&stepwise=false").to("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")
from("seda:download?size=3&concurrentConsumers=3&blockWhenFull=true&purgeWhenStopping=true")
.process({
String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME_ONLY, String.class);
CamelContext context = exchange.getContext();
ConsumerTemplate downloadConsumer = context.createConsumerTemplate();
Producer unpackProducer = context.getRoute("unpack").getEndpoint().createProducer();
Map<String,Object> parms = new HashMap<>();
parms.put("fileName", fileName);
parms.put("runLoggingLevel", "INFO");
parms.put("consumer.bridgeErrorHandler", "true");
parms.put("idempotentRepository", "#idemRepo");
parms.put("noop", "true");
parms.put("readLock", "changed");
parms.put("readLockLoggingLevel", "INFO");
parms.put("readLockMarkerFile", "false");
parms.put("initialDelay", "0");
parms.put("autoCreate", "false");
parms.put("maximumReconnectAttempts", "0");
parms.put("streamDownload", "true");
parms.put("stepwise", "false");
parms.put("throwExceptionOnConnectFailed", "true");
parms.put("useList", "false");
downloadConsumer.start();
Exchange downloadExchange = downloadConsumer.receive(URISupport.normalizeUri(URISupport.appendParametersToURI("ftp://user@localhost/path", parms));
unpackProducer.process(downloadExchange);
if (downloadExchange.isFailed()) {
LOGGER.error("unpack failed", downloadExchange.getException());
exchange.setException(downloadExchange.getException());
}
downloadConsumer.doneUoW(downloadExchange);
downloadConsumer.stop();
}
from("direct:unpack").routeId("unpack")
.convertBodyTo(InputStream.class, null)
.split(new TarSplitter()).streaming()
.choice()
.when(header(FILE_NAME).regex(XML_FILTER))
.unmarshal().jacksonxml(POJO.class)
.endChoice()
.when(header(FILE_NAME).regex(XML2_FILTER))
.unmarshal().jacksonxml(POJO2.class)
.endChoice()
.end()
.end()
.to("file://...")
提前感谢您的帮助。
我的错误,需要添加binary=true。
我无法连接到具有SEDA队列的骆驼路线。在服务器端,我有以下配置: 我正在尝试从这样的独立客户端访问此路由: 但我的制作人无法连接到seda队列。无法按我的路线排队。无法在我的bean属性中添加camelContext。我正在获取“bean类的属性'camelContext'无效”。如果我将正文发送到SEDA队列,则消息将发送到那里,但不会发送到路由的下一个元素
我们有一个用例,在这个用例中,基于到达工作队列的工作项,我们需要使用消息元数据来决定数据流来自哪个Kafka主题。我们部署的工作节点可能少于100个,每个工作节点可以有可配置数量的线程从队列接收消息。因此,如果一个工人有“N”个线程,我们会打开Kafka流到“N”个不同的主题。(n通常小于10)。一旦worker完成了对消息的处理,我们还需要关闭流。工作人员可以接收下一个消息,一旦它的第一个消息,
与之间的差异: > 它们可能有不同的特点: 这里讨论的似乎是另一个毫无意义的流拆分器特性策略(并行计算似乎更好):深入理解Java8和Java9中的拆分器特性 在本例中,从禁用拆分功能的顺序流创建了一个拆分器(返回null)。当以后需要转换回一个流时,该流不会从并行处理中受益。一种耻辱。 最大的问题是:作为解决办法,在调用之前总是将流转换为并行流会有什么主要影响?
我正在开发一个使用Apache Camel和JMX活动的小应用程序。非常简单地说,我有一个使用SEDA组件的路由--只有一个消费者--简而言之,它创建自己的线程,并在路由繁忙时对传入的交换进行排队。 我想知道Camel中是否有一些现成的东西允许我这样做,或者我忽略了Hawtio或JConsole中的一些东西。 提前谢了。
我使用RabbitMQ网络创建了一个主题交换UITX并绑定到交换两个队列TX. Q1和TX. Q2,每个队列都相应地绑定了路由密钥rk1和rk2,并向交换产生了少量消息。 现在,我想使用Spring Cloud Stream创建一个消费者,它只从Q1获取消息。我尝试使用配置: 以及使用消息的方法的注释。 因此,我可以看到使用者创建了一个同名TX.Q1的队列(或绑定),但新队列/绑定的路由键是# 如
我有一种生产者-消费者设置,其中生产者(不同线程上的多个生产者)将数据排入redis队列,消费者(单个线程上的单个消费者)监视该队列。当队列长度达到时,例如 使用redis-py客户端,我使用以下代码提取前10000项,并删除它们: (用于lrange和ltrim的文件) 我的问题是,这里有数据丢失的机会吗?例如,在调用函数ltrim()和实际修剪队列之间的时间(在这种情况下,最新的日志将丢失,因