鉴于我有一个Flux
例如,使用WebClient,我可以使用这种方法实现这一点
WebClient。获取('example.com')。交换flatMap{it.bodytomino(InputStreamResource::class.java)}。映射{it.inputStream}
但是我不知道如何做同样的事情,当我有
Flux
您可以转换Flux
Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
(baos, str) -> {
try {
baos.write(str.getBytes());
} catch (IOException e) {
// do nothing
}
})
.map(baos -> new ByteArrayInputStream(baos.toByteArray()))
.map(inputStream -> ... // call other library);
这需要一个冷的
Flux
Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
.map(inputStream -> ... // call other library);
Edwin的回答并没有对我起到任何作用,因为上游的错误被订阅者吞没了,并没有传播到InputStream
的消费者。不过,受埃德温回答的启发,我找到了不同的解决方案。下面是一个消耗流量的示例
fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
val inputStream = PipedInputStream()
val outputStream = PipedOutputStream(inputStream)
val isStreamEmitted = AtomicBoolean(false)
return flux.handle<InputStream> { byteArray, sink ->
try {
outputStream.write(cipher.update(byteArray))
// emit the input stream as soon as we get the first chunk of bytes
// make sure we do it only once
if (!isStreamEmitted.getAndSet(true)) {
sink.next(inputStream)
}
} catch (e: Exception) {
// catch all errors to pass them to the sink
sink.error(e)
}
}.doOnComplete {
// here we have a last chance to throw an error
outputStream.write(cipher.doFinal())
}.doOnTerminate {
// error thrown here won't get propagated downstream
// since this callback is triggered after flux's completion
outputStream.flush()
outputStream.close()
}
}
这里的技巧是使用
handle
操作符生成最多发射一个项目的通量。与
Mono
不同,通量不会在第一次发射后立即终止。虽然它不会再发射更多的项目,但它将保持“打开”状态,以发射第一次发射后发生的最终错误。
下面是消耗
流量的示例
fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
decryptAndGetInputStream(flux, cipher)
// the following operator gets called at most once
.flatMap { inputStream ->
// wrap the blocking operation into mono
// subscribed on another thread to avoid deadlocks
Mono.fromCallable {
processInputStream(inputStream)
}.subscribeOn(Schedulers.elastic())
// to get mono out of flux we implement reduce operator
// although it gets never called
}.reduce { t, _ -> t }
这里的另一个优点是,在第一块数据可用之前,使用InputStream的线程不会阻塞。
可能有很多方法可以做到这一点。一种可能是使用PipedInputStream和PipedOutStream。
其工作方式是将输出流链接到输入流,这样就可以从链接的输入流中读取写入输出流的所有内容,通过这样做,在两者之间创建一个管道。
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);
不过,有一个警告,根据管道流的文档,写入过程和读取过程必须在单独的线程上进行,否则可能会导致死锁。
所以,回到我们的反应流场景,我们可以创建一个管道(如上所述),订阅Flux
对象,并将从中获得的数据写入管道输出流。无论你在那里写什么,都可以在管道的另一边,在相应的输入流中读取。这个输入流可以与非反应性方法共享。
我们只需要特别小心,我们订阅了一个单独的线程流量。e、 g.订阅(Schedulers.elastic())
。
以下是此类订阅服务器的一个非常基本的实现:
class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final PipedInputStream in;
private PipedOutputStream out;
PipedStreamSubscriber(PipedInputStream in) {
Objects.requireNonNull(in, "The input stream must not be null");
this.in = in;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
//change if you want to control back-pressure
super.hookOnSubscribe(subscription);
try {
this.out = new PipedOutputStream(in);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnNext(byte[] payload) {
try {
out.write(payload);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnComplete() {
close();
}
@Override
protected void hookOnError(Throwable error) {
//TODO handle the error or at least log it
logger.error("Failure processing stream", error);
close();
}
@Override
protected void hookOnCancel() {
close();
}
private void close() {
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
//probably just ignore this one or simply log it
}
}
}
使用这个订阅服务器,我可以定义一个非常简单的实用方法,它可以改变流量
static InputStream createInputStream(Flux<byte[]> flux) {
PipedInputStream in = new PipedInputStream();
flux.subscribeOn(Schedulers.elastic())
.subscribe(new PipedStreamSubscriber(in));
return in;
}
请注意,当流完成时,当发生错误或订阅被取消时,我非常小心地关闭输出流,否则我们会在读取端运行阻塞的风险,等待更多的输入到达。关闭输出流是在管道的另一边发出输入流结束的信号。
现在InputStream可以像任何常规流一样被使用,因此您可以将其传递给您的非反应性方法,例如:
Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);
try (InputStream in = createInputStream(jedi)) {
byte[] data = new byte[5];
int size = 0;
while ((size = in.read(data)) > 0) {
System.out.printf("%s", new String(data, 0, size));
}
}
上面的代码产生:
Luke
Obi-Wan
Yoda
我正在进行spring webflux文件上传。我想从控制器上传文件到amazon S3 bucket上。在控制器中,我收到了以下物体 从文件部分。content()我可以 我的问题是如何转换这个通量
使用spring 5,对于reactor,我们有以下需求。 什么方法可以转换单声道
我有一个方法,可以尝试使用WebClient返回Mono 它可以返回我期望的结果。然后我尝试创建另一个方法来支持列表作为参数 但这一次返回一个奇怪的结果。 我是反应式编程新手,将流和单声道结合起来,然后转换为流量的正确方法是什么?
转换为json格式。
我可以使用现在已弃用的方法将WebClient响应转换为响应实体。 请建议实现相同结果的其他方法。下面是我的代码。
问题内容: 我正在搜索轻量级API(最好是单个类)以转换 到xml,反之亦然,将XML转换回Map。 例: 结果: 然后回来: 我不想使用JAXB或JSON转换API。只需简单的情况,它就不必处理嵌套的地图或属性或其他任何内容。有什么建议么? 编辑:我创建了一个工作副本并粘贴示例。感谢fvu和Michal Bernhard。 下载最新的XStream框架,“仅核心”就足够了。 不需要转换器或其他任