当前位置: 首页 > 知识库问答 >
问题:

第一行只读Bindy的Apache Camel反应流

危彬彬
2023-03-14

我尝试使用Apache Camel(版本2.25.3)反应流和Spring Boot来读取一个大型csv文件,并使用Bindy解封这些行。这是“工作”,因为应用程序运行并检测文件,但我只看到流中文件的第一行。它似乎与Bindy相关,因为如果我从等式中去掉解组,我就可以在流中返回csv文件的所有行。我已经简化了这个问题,在这里演示了SO。我正在使用Spring Webflux来公开结果发布者。

所以我的骆驼路线是这样的:

import lombok.RequiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@RequiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
    private final CamelReactiveStreamsService camelRs;

    @Override
    public void configure() {
        var bindy = new BindyCsvDataFormat(LineItem.class);

        from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
                .unmarshal(bindy)
                .to("reactive-streams:lineItems");
    }

    public Flux<LineItem> getLineItemFlux() {
        Publisher<LineItem> lineItems = camelRs.fromStream("lineItems", LineItem.class);

        return Flux.from(lineItems);
    }
}

宾迪类:

@ToString
@Getter
@CsvRecord(separator = ";", skipFirstLine = true, skipField =true)
public class LineItem {
    @DataField(pos = 2)
    private String description;
}
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return lineItemFlux;
}
curl localhost:8080/lineItems
from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
        .to("reactive-streams:rawLines");

from("reactive-streams:rawLines")
        .unmarshal(bindy)
        .to("reactive-streams:lineItems");
2021-01-04 10:13:26.798  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Total 2 routes, of which 2 are started

但随后我得到一个异常,说明“流没有活动订阅”:

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport      ] [         9]
[route1            ] [to1               ] [reactive-streams:rawLines                                                     ] [         5]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalStateException: The stream has no active subscriptions
    at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]

有没有人有关于我如何将Bindy与反应流结合使用的建议?谢了!

编辑

@Override
public void configure() {
    from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
            .to("reactive-streams:extractedFile");
}
public Flux<File> getFileFlux() {
    return Flux.from(camelRs.fromStream("extractedFile", File.class));
}
private Flux<LineItem> readLineItems() {
    return fileFlux
            .flatMap(message -> Flux.using(
                    () -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
                            .withSeparator(';')
                            .withType(LineItem.class)
                            .build()
                            .stream(),
                    Flux::fromStream,
                    BaseStream::close)
            );
}

private FileReader createFileReader(File file) {
    System.out.println("Reading file from: " + file.getAbsolutePath());
    try {
        return new FileReader(file);
    } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
    }
}
@GetMapping(value = "/lineItems", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return readLineItems();
}

共有1个答案

贺季
2023-03-14

我猜文件使用者只是将整个文件传递到解组步骤。

因此,如果将文件使用者的结果解封为lineitem,则将整个文件内容“缩减”到第一行。

相反,如果删除反编组,您将获得整个文件内容。但很可能是文件使用者在传递之前将整个文件加载到内存中。

from("file:...")
    .split(body().tokenize(LINE_FEED)).streaming()
    .to("direct:processLine") 

像这样,拆分器将每一行发送到路由direct:processline进行进一步处理。

在这个场景中,我面临的问题是解析单个CSV行。大多数CSV库被设计为读取和解析整个文件,而不是单行。

然而,相当古老的OpenCSV库有一个CSVParser和一个Parseline(String csvLine)方法。所以我用它来解析一个“完全分离”的CSV行。

 类似资料:
  • 我正在使用camel-ftp从远程服务器获取文件。当文件被移动时,我需要读取第一行,这是一个头,并在路由的其余部分用它设置骆驼头。我需要为它构建一个自定义处理器吗?我想到了.split(),但它会解析文件中的每一行(这个文件很大)。

  • 我可以使用Apache POI只读取Excel文件的第一行吗?我不想读取整个文件,因为它有50,000行,读取最多需要10分钟(性能是一场灾难)。我通过文件上传获取字节。我的选项是字节数组或InputStream。现在我正在这样做:

  • 问题内容: 是否有比从Python中的文件一次读取第二行更好的方法呢? 我在2.5.4中。较新版本有何不同? 编辑:删除的答案指出:在py3k中,您需要执行next(f)而不是f.next()。更不用说打印更改 问题答案: import itertools las,需要Python 2.6或更高版本;2.5仅具有,如果有奇数行,它将截断最后一行。当然,提供与生成器相同的功能非常容易。 这是一个更一

  • 问题内容: 在我当前的程序中,一种方法要求用户输入产品描述作为输入。但是,当我以后尝试打印此信息时,仅显示节目的第一个单词。这可能是什么原因?我的方法如下: 因此,如果用户输入的是“带有橙味的汽水”,则只会产生“ 汽水”。 任何帮助将不胜感激! 问题答案: 替换为: