当前位置: 首页 > 知识库问答 >
问题:

如何将反应器通量转换为InputStream

沙柏
2023-03-14

鉴于我有一个Flux

例如,使用WebClient,我可以使用这种方法实现这一点

WebClient。获取('example.com')。交换flatMap{it.bodytomino(InputStreamResource::class.java)}。映射{it.inputStream}

但是我不知道如何做同样的事情,当我有Flux


共有3个答案

杜绍元
2023-03-14

您可以转换Flux

Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
                   (baos, str) -> {
                       try {
                           baos.write(str.getBytes());
                       } catch (IOException e) {
                           // do nothing
                       }
                   })
          .map(baos -> new ByteArrayInputStream(baos.toByteArray()))
          .map(inputStream -> ... // call other library);

这需要一个冷的Flux

Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
          .map(inputStream -> ... // call other library);

解翰采
2023-03-14

Edwin的回答并没有对我起到任何作用,因为上游的错误被订阅者吞没了,并没有传播到InputStream的消费者。不过,受埃德温回答的启发,我找到了不同的解决方案。下面是一个消耗流量的示例

fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
    val inputStream = PipedInputStream()
    val outputStream = PipedOutputStream(inputStream)
    val isStreamEmitted = AtomicBoolean(false)
    
    return flux.handle<InputStream> { byteArray, sink ->
        try {
            outputStream.write(cipher.update(byteArray))
            // emit the input stream as soon as we get the first chunk of bytes
            // make sure we do it only once
            if (!isStreamEmitted.getAndSet(true)) {
                sink.next(inputStream)
            }
        } catch (e: Exception) {
            // catch all errors to pass them to the sink
            sink.error(e)
        }
    }.doOnComplete { 
        // here we have a last chance to throw an error  
        outputStream.write(cipher.doFinal())
    }.doOnTerminate {
        // error thrown here won't get propagated downstream
        // since this callback is triggered after flux's completion 
        outputStream.flush()
        outputStream.close()
    }
}

这里的技巧是使用handle操作符生成最多发射一个项目的通量。与Mono不同,通量不会在第一次发射后立即终止。虽然它不会再发射更多的项目,但它将保持“打开”状态,以发射第一次发射后发生的最终错误。

下面是消耗流量的示例

fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
    decryptAndGetInputStream(flux, cipher)
        // the following operator gets called at most once
        .flatMap { inputStream ->
            // wrap the blocking operation into mono
            // subscribed on another thread to avoid deadlocks
            Mono.fromCallable { 
                processInputStream(inputStream)
            }.subscribeOn(Schedulers.elastic())
        // to get mono out of flux we implement reduce operator
        // although it gets never called
        }.reduce { t, _ -> t }

这里的另一个优点是,在第一块数据可用之前,使用InputStream的线程不会阻塞。

苍宝
2023-03-14

可能有很多方法可以做到这一点。一种可能是使用PipedInputStream和PipedOutStream。

其工作方式是将输出流链接到输入流,这样就可以从链接的输入流中读取写入输出流的所有内容,通过这样做,在两者之间创建一个管道。

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);

不过,有一个警告,根据管道流的文档,写入过程和读取过程必须在单独的线程上进行,否则可能会导致死锁。

所以,回到我们的反应流场景,我们可以创建一个管道(如上所述),订阅Flux对象,并将从中获得的数据写入管道输出流。无论你在那里写什么,都可以在管道的另一边,在相应的输入流中读取。这个输入流可以与非反应性方法共享。

我们只需要特别小心,我们订阅了一个单独的线程流量。e、 g.订阅(Schedulers.elastic())

以下是此类订阅服务器的一个非常基本的实现:

class PipedStreamSubscriber extends BaseSubscriber<byte[]> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final PipedInputStream in;
    private PipedOutputStream out;

    PipedStreamSubscriber(PipedInputStream in) {
        Objects.requireNonNull(in, "The input stream must not be null");
        this.in = in;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        //change if you want to control back-pressure
        super.hookOnSubscribe(subscription);
        try {
            this.out = new PipedOutputStream(in);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnNext(byte[] payload) {
        try {
            out.write(payload);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnComplete() {
        close();
    }

    @Override
    protected void hookOnError(Throwable error) {
        //TODO handle the error or at least log it
        logger.error("Failure processing stream", error);
        close();
    }

    @Override
    protected void hookOnCancel() {
        close();
    }

    private void close() {
        try {
            if (out != null) {
                out.close();
            }
        } catch (IOException e) {
            //probably just ignore this one or simply  log it
        }
    }
}

使用这个订阅服务器,我可以定义一个非常简单的实用方法,它可以改变流量

static InputStream createInputStream(Flux<byte[]> flux) {

    PipedInputStream in = new PipedInputStream();
    flux.subscribeOn(Schedulers.elastic())
        .subscribe(new PipedStreamSubscriber(in));

    return in;
}

请注意,当流完成时,当发生错误或订阅被取消时,我非常小心地关闭输出流,否则我们会在读取端运行阻塞的风险,等待更多的输入到达。关闭输出流是在管道的另一边发出输入流结束的信号。

现在InputStream可以像任何常规流一样被使用,因此您可以将其传递给您的非反应性方法,例如:

Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);

try (InputStream in = createInputStream(jedi)) {
    byte[] data = new byte[5];
    int size = 0;
    while ((size = in.read(data)) > 0) {
        System.out.printf("%s", new String(data, 0, size));
    }
} 

上面的代码产生:

Luke
Obi-Wan
Yoda

 类似资料:
  • 我正在进行spring webflux文件上传。我想从控制器上传文件到amazon S3 bucket上。在控制器中,我收到了以下物体 从文件部分。content()我可以 我的问题是如何转换这个通量

  • 使用spring 5,对于reactor,我们有以下需求。 什么方法可以转换单声道

  • 我有一个方法,可以尝试使用WebClient返回Mono 它可以返回我期望的结果。然后我尝试创建另一个方法来支持列表作为参数 但这一次返回一个奇怪的结果。 我是反应式编程新手,将流和单声道结合起来,然后转换为流量的正确方法是什么?

  • 转换为json格式。

  • 我可以使用现在已弃用的方法将WebClient响应转换为响应实体。 请建议实现相同结果的其他方法。下面是我的代码。

  • 问题内容: 我正在搜索轻量级API(最好是单个类)以转换 到xml,反之亦然,将XML转换回Map。 例: 结果: 然后回来: 我不想使用JAXB或JSON转换API。只需简单的情况,它就不必处理嵌套的地图或属性或其他任何内容。有什么建议么? 编辑:我创建了一个工作副本并粘贴示例。感谢fvu和Michal Bernhard。 下载最新的XStream框架,“仅核心”就足够了。 不需要转换器或其他任