当前位置: 首页 > 知识库问答 >
问题:

来自控制器的WebFlux异步响应

冷越泽
2023-03-14

我的任务是简单地制作一个控制器,当它们准备好时立即给我结果(下面的简单示例)

我想得到字符串的确切数量(例如1000个字符串,以某种方式为1秒)(实际上我需要得到func的结果,但为了简化任务,只是字符串)

因此,当我在控制器中收到一些请求时,我希望它能以这种方式在它们准备好后尽快给出答案(无需缓冲结果):

1秒钟

“一些绳子”-

1秒钟

“另一个”-

1秒钟

“第三个”-

1000秒

“一些绳子”

......

"千弦"

这是我的代码:

@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> get3() {
        System.out.println("get3 start");
        Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "flux data--" + i;
        }));
        System.out.println("get3 end");
        return result;

    }

实际上在我的控制台里

立即“get3 start”和“get3 end”,但响应仅在所有字符串准备就绪后进行

我对这个任务的实际服务是类似的(但我在这里合并了2个通量),我得到的通量是由间隔形成的,所以我希望它在结果出现时立即给我结果

public Flux<AnswerCalculationDto> calculate(CalculationDto calculationDto){
        String checkMsg = checkCalculationDto(calculationDto);
        if(checkMsg.equals("Success")){//valid
            Long quantity = Long.parseLong(calculationDto.getQuantity());

            Flux<AnswerCalculationDto> firstFunc =  Flux.interval(interval)//func 1
                    .onBackpressureDrop()
                    .takeWhile((i)-> i < quantity)
                    .map((i)->new AnswerCalculationDto(i,1,translateToJava(calculationDto.getFunc1(),i)))
                    ;
            Flux<AnswerCalculationDto> secondFunc = Flux.interval(interval) //func 2
                    .onBackpressureDrop()
                    .takeUntil((i)-> i > quantity-2)
                    .map((i)->new AnswerCalculationDto(i,2,translateToJava(calculationDto.getFunc2(),i)) )
                    ;
            return Flux.merge(firstFunc,secondFunc);
        }
        else {//invalid data from client
            return Flux.just(new AnswerCalculationDto("",checkMsg));
        }

    }

共有2个答案

颛孙昆
2023-03-14

我在寻找的是Http流,此外请注意,Safari以及邮递员和axios(js lib-我在我的前端部分使用它)不支持超文本传输协议流,所以你不能看到你的输出出现,只要一个结果准备就绪(只有1响应中的所有结果),尝试Chrome。

此外,如果您像我一样与前端部分作斗争,请尝试搜索SSE-server-send-event,例如:https://turkogluc.com/server-sent-events-with-spring-boot-and-reactjs/

希望这能有所帮助

孙永思
2023-03-14

使用WebFlux从服务器流式传输数据有几个选项:

  • 服务器发送的事件推送单个事件(媒体类型:text/event-stream
  • 用换行符分隔的流事件(媒体类型:Application/x-ndjson

下面是一个完整的示例,它公开了文本/事件流

@RestController
public class StreamingController {

    @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
    Flux<DataEntry> sse() {
        return stream();
    }

    @GetMapping(produces = APPLICATION_NDJSON_VALUE)
    Flux<DataEntry> ndjson() {
        return stream();
    }

    private Flux<DataEntry> stream() {
        return Flux.range(1, 1000)
                .delayElements(Duration.ofSeconds(1))
                .map(i -> new DataEntry(i, Instant.now()));
    }

    @Value
    @Builder
    private static class DataEntry {
        long index;
        Instant timestamp;
    }
}

要测试文本/事件流使用:

curl-v-H“接受:文本/事件流”http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
< 
data:{"index":1,"timestamp":"2022-04-08T14:41:06.513352Z"}

data:{"index":2,"timestamp":"2022-04-08T14:41:07.527817Z"}

data:{"index":3,"timestamp":"2022-04-08T14:41:08.541706Z"}

data:{"index":4,"timestamp":"2022-04-08T14:41:09.553329Z"}

要测试应用程序/x-ndjson,请使用:

curl-v-H“接受:应用程序/x-ndjson”http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: application/x-ndjson
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
< 
{"index":1,"timestamp":"2022-04-08T14:42:36.081269Z"}
{"index":2,"timestamp":"2022-04-08T14:42:37.094928Z"}
{"index":3,"timestamp":"2022-04-08T14:42:38.109378Z"}
{"index":4,"timestamp":"2022-04-08T14:42:39.121315Z"}

上述示例将以1秒的间隔生成1000条记录。您还可以使用以下方法生成无限流:

private Flux<DataEntry> stream() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new DataEntry(i, Instant.now()));
}
 类似资料:
  • 我试图用Spring boot 1.5.9.RELEASE来构建一个rest api,却被这个问题卡住了。对apiendpoint的post请求工作得很好,但是当到达get请求时,结果会重复。应用程序对get请求产生的响应是 关联的请求映射类代码 响应类 对如何解决这个问题有什么想法吗?预先感谢

  • 我正在尝试构建一个Spring WebFlux项目,并实现以下业务逻辑: 1-使用WebClient调用外部REST Api,并使用下面的模型解析Json结果。它工作正常 谢谢

  • 异步控制(Sync Controller) 概述 众所周知,在分布式计算系统中,由于多个计算节点计算进度不可能完全一致,会导致了在汇总结果时需要等待那些计算速度较慢的节点,即慢节点会拖慢整个计算任务的进度,浪费计算资源。 考虑到机器学习的特殊性,系统其实可以适当放宽同步限制,没有必要每一轮都等待所有的计算节点完成计算,部分跑得快的Worker,其实完全可以先把训练好的增量Push上去,然后进行下一

  • 我有一个要求编写Spring的网络流量endpoint(路由器功能)来发送邮件到邮件收件人列表。UI将选择邮件收件人列表并将列表发送到我将编写的API。我希望在收到请求后立即以这种方式引入endpoint,我应该向UI发送响应,说正在发送电子邮件。发送响应后,我应该异步继续邮件发送工作。我不能像我们在Spring MVC中使用的那样使用@async注释,因为它是反应式世界中的反模式。 既然我在使用

  • 问题内容: 我正在编写一个依赖于来自各种站点/服务的数据的应用程序,并且涉及基于这些不同来源的数据执行计算以产生最终产品。 我编写了一个示例类,下面带有两个函数,该函数从两个来源收集数据。我选择使功能不同,因为有时我们会根据源应用不同的身份验证方法,但是在本示例中,我只是将它们简化为最简单的形式。这两个功能都使用Alamofire触发并处理请求。 然后,我有一个初始化函数,该函数表示如果我们已经成

  • 我在我的控制器中使用javax验证,带有。当服务器接收到无效数据时,它会抛出错误,但我想处理该错误并返回自定义格式的错误。我无法在控制器建议中捕获异常。我使用的是Spring webFlux,因此无法使用bindingResult。我如何处理该异常?这是我的代码 控制器 资源 错误处理程序