当前位置: 首页 > 知识库问答 >
问题:

何时在服务器上产生事件,并将mediatype.text_event_stream传递给客户端上的订阅服务器

林哲茂
2023-03-14

我创建了一个示例客户机/服务器应用程序来熟悉Spring WebFlux/Reactor Netty。现在,当响应包含Flux并且媒体类型是“text/event-stream”时,我对客户端的行为有点困惑。我看到的是,服务器上产生的每个元素都被立即发送到客户机,但还没有交付给订户。在服务器端的生产者完成流量之后,第一次交付给订阅者。对我来说,这意味着所有元素首先在客户端的reactor-netty中收集,直到它得到一个完整/错误事件。

我的结论是正确的还是我会做错什么?如果是真的,这在不久的将来会改变吗?根据我目前观察到的行为,使用Spring Webflux的大多数好处都被否定了,因为与Spring Mvc一样,消费者必须等到创建并传输了整个元素集合之后才能开始处理元素。

我的服务器应用程序是:`

@SpringBootApplication
public class ServerApp {
    public static void main(String[] args) {
        new SpringApplicationBuilder().sources(ServerApp.class).run(args);
    }

    @RestController
    public static class TestController {
        @GetMapping(value = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<String> testFlux() {
            class AsyncSink implements Consumer<SynchronousSink<String>> {
                private List<String> allStrings = List.of(
                        "Hello Flux1!",
                        "Hello Flux2!",
                        "Hello Flux3!",
                        "Hello Flux4!",
                        "Hello Flux5!");
                private int index = 0;

                @Override
                public void accept(SynchronousSink<String> sink) {
                    if (index == allStrings.size()) {
                        sink.complete();
                    }
                    else {
                        sink.next(allStrings.get(index++));
                    }
                }

            }

            return Flux.generate(new AsyncSink());
        }
    }
}
@SpringBootApplication
public class ClientApp {
    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext aContext = new SpringApplicationBuilder().web(WebApplicationType.NONE).sources(ClientApp.class).run(args);

        Flux<String> aTestFlux = aContext.getBean(TestProxy.class).getFlux();
        aTestFlux.subscribe(new TestSubscriber());

        System.out.println("Press ENTER to exit.");
        System.in.read();
    }

    @Bean
    public WebClient webClient() {
        return WebClient.builder().baseUrl("http://localhost:8080").build();
    }

    @Component
    public static class TestProxy {
        @Autowired
        private WebClient webClient;

        public Flux<String> getFlux() {
            return webClient.get().uri("/test").accept(MediaType.TEXT_EVENT_STREAM).exchange().flatMapMany(theResponse -> theResponse.bodyToFlux(String.class));
        }
    }

    private static class TestSubscriber extends BaseSubscriber<String> {
        @Override
        public void hookOnSubscribe(Subscription subscription) {
            System.out.println("Subscribed");
            request(Long.MAX_VALUE);
        }

        @Override
        public void hookOnNext(String theValue) {
            System.out.println(" - " + theValue);
            request(1);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println("   done");
        }

        @Override
        protected void hookOnCancel() {
            System.out.println("   cancelled");
        }

        @Override
        protected void hookOnError(Throwable theThrowable) {
            theThrowable.printStackTrace(System.err);
        }
    }
}
data:Hello Flux1!

data:Hello Flux2!

data:Hello Flux3!

data:Hello Flux4!

data:Hello Flux5!

共有1个答案

缑泓
2023-03-14

取自反应性文档并改写以满足您的需要。

我的猜测是,在您的示例中,您给了generate函数传递一个使用者,当完成时将发出这个使用者。

通过使用flux#generate(callable stateSupplier、bifunction 、s>generater) 方法,您提供了一个包含您想要发出的项的状态,然后在提供的 bifunction中逐个发出每个项。

 
  Flux<String> flux = Flux.generate(
    () -> List.of("1!", "2!", "3!", "4!", "5!"), 
    (state, sink) -> {
        if (index == allStrings.size()) {
            sink.complete();
        } else {
          sink.next(state.get(index++));
        } 
    });

 

 类似资料:
  • 问题内容: 我想将一些文件上传到HTTP服务器。基本上,我需要的是对服务器的某种POST请求,其中包含一些参数和文件。我看到了仅上传文件的示例,但没有找到如何也传递其他参数的示例。 什么是最简单,免费的解决方案?有人有我可以学习的文件上传示例吗?我已经搜寻了几个小时,但是(也许只是那几天)找不到我真正需要的东西。最好的解决方案是不涉及任何第三方类或库的东西。 问题答案: 通常,你会用来触发HTTP

  • 问题内容: 可能是一个非常基本的问题,但我似乎找不到简单的答案。 我有一个利用Angular的GET方法,该方法从特定的url()请求一个Promise 。 在此服务器上,我运行可以处理GET请求的快速脚本脚本。 server.js 我可以使用Angular GET方法与URL_OF_INTEREST通信,如下所示: 但是,字段数量,货币,来源和描述需要从Angular客户端应用程序理想地传递。

  • 本文向大家介绍java模拟客户端向服务器上传文件,包括了java模拟客户端向服务器上传文件的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了java客户端向服务器上传文件的具体代码,供大家参考,具体内容如下 先来了解一下客户端与服务器Tcp通信的基本步骤: 服务器端先启动,然后启动客户端向服务器端发送数据。 服务器端收到客户端发送的数据,服务器端会响应应客户端,向客户端发送响应结果。

  • 本文向大家介绍Java实现文件上传服务器和客户端,包括了Java实现文件上传服务器和客户端的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了Java实现文件上传服务器和客户端的具体代码,供大家参考,具体内容如下 文件上传服务器端: 文件上传客户端: 本文已被整理到了《Java上传操作技巧汇总》,欢迎大家学习阅读。  以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐

  • 我是web服务编程新手,我想使用netbeans 6在Grizzly服务器上使用Jersey创建一个restful web服务,然后创建一个客户端javascript,以便通过浏览器使用该web服务。因此,我开始了解更多关于restful web服务的知识,并在网上阅读了大量指南,然后通过阅读jersey用户指南http://jersey . Java . net/nonav/documentat