当前位置: 首页 > 知识库问答 >
问题:

使用Reactor-Netty-HttpClient,如何将客户端配置为使用Flux发布服务器向服务器发送多个项目?

崔宜修
2023-03-14

更具体的问题是为什么通量。fromIterable()不适用于ReactorNetty HttpClient?这个简单的例子很好用。所有10项均由Flux publisher发出:

public class ConcurrentLinkedQueueFluxTest {

    public static final void main(String[] args) {

        List<Integer> aList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();
        aList.stream().forEach(i -> clq.add(i));

        Flux.fromIterable(clq)
                .subscribe(new ReactiveSubscriber<Integer>());
    }
}

使用ReactiveSubscriber:

class ReactiveSubscriber<T> extends BaseSubscriber<T> {

    private Subscription subscription;

    @Override
    public void hookOnSubscribe(Subscription s) {
        System.out.println("In hookOnSubscribe");
        this.subscription = s;
        subscription.request(1);
    }

    @Override
    public void hookOnNext(T response) {
        System.out.println("In hookOnNext: "+response.toString());
        subscription.request(1);
    }

    @Override
    public void hookOnError(Throwable t) {
        System.out.println(t.getLocalizedMessage());
    }

    @Override
    public void hookOnComplete() {
        System.out.println("In hookOnComplete");
        subscription.cancel();
    }
}

如下图所示,如果我在HttpClient中使用类似的订阅服务器。send(Flux.fromIterable())仅发出1项,而不是队列中的所有项。因此,由于这种流量,有些配置不正确。创建通量的fromIterable()方法不适用于HttpClient。

对于实际的生产代码问题,我包括了队列定义、客户机方法、订阅者、服务器方法和一个日志,该日志显示从客户机到服务器只发送了5项中的1项。看起来,即使HttpClient send()方法有一个从队列加载的Flux对象,也只发送一个项目,尽管队列有5个项目。

要发送到服务器的项目位于ByteBuf类型的队列中:


    private ConcurrentLinkedQueue<ByteBuf> electionRequestQueue;

    public ElectionTransactionRequest() {
        electionRequestQueue = new ConcurrentLinkedQueue<ByteBuf>();
    }

客户端方法是:


     public void task() {

        log.debug("Queue size: "+electionRequestQueue.size());

        ElectionTransactionSubscriber etSubscriber = new ElectionTransactionSubscriber();

        HttpClient.create()
             .tcpConfiguration(tcpClient -> tcpClient.host("localhost"))
             .port(61005)
             .protocol(HttpProtocol.HTTP11)
             .post()
             .uri("/election/transaction")
             .send(Flux.fromIterable(electionRequestQueue))
             .responseContent()
             .aggregate()
             .asByteArray()
             .subscribe(etSubscriber);
     }

订阅者定义为:



class ElectionTransactionSubscriber extends BaseSubscriber<byte[]> {

    private static final Logger log = LoggerFactory.getLogger(ElectionTransactionSubscriber.class);

    private Subscription subscription;

    @Override
    public void hookOnSubscribe(Subscription s) {
        log.debug("In hookOnSubscribe");
        this.subscription = s;
        subscription.request(1);
    }

    @Override
    public void hookOnNext(byte[] response) {
        log.info("In hookOnNext");
        subscription.request(1);
    }

    @Override
    public void hookOnError(Throwable t) {
        log.error(t.getLocalizedMessage());
    }

    @Override
    public void hookOnComplete() {
        log.debug("In hookOnComplete");
        subscription.cancel();
    }
}

服务器端在方法中定义:


    public void start() {

        disposableServer =
            HttpServer.create()
                .host("localhost")
                .port(61005)
                .protocol(HttpProtocol.HTTP11)
                .route(routes ->
                    routes
                        .post("/election/transaction",
                            (request, response) -> response.send(request
                                                                .receive()
                                                                .aggregate()
                                                                .flatMap(aggregatedBody ->
                                                                    electionTransactionHandler.electionTransactionResponse(aggregatedBody)))))
                        .bindNow();
        disposableServer.onDispose().block();

    }

当客户端运行时,队列中有5个项目,但只有一个项目被发送到服务器,如日志中所示。在订阅者中,仅发送来自Flux发布者的1个项目后,就会调用方法hookOn完成()。


2020-12-01 12:03:56,442 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionRequest: Queue size: 5

2020-12-01 12:03:56,539 DEBUG [main] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnSubscribe

2020-12-01 12:03:56,746 INFO  [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnNext

2020-12-01 12:03:56,746 DEBUG [reactor-http-epoll-1] com.dd.vbc.business.services.client.requests.ElectionTransactionSubscriber: In hookOnComplete

共有1个答案

甘学潞
2023-03-14

缓冲可能是您最容易使用的。它会发出缓冲区(默认情况下是Collection,List)。因此,在您的情况下,订阅者会收到一个列表

StepVerifier.create(
    Flux.range(1, 10)
        .buffer(5, 3) //overlapping buffers
    )
        .expectNext(Arrays.asList(1, 2, 3, 4, 5))
        .expectNext(Arrays.asList(4, 5, 6, 7, 8))
        .expectNext(Arrays.asList(7, 8, 9, 10))
        .expectNext(Collections.singletonList(10))
        .verifyComplete();

=========================

这是直接从Reactor核心留档:

三种配料

当你有很多元素并且你想把它们分批处理时,你在Reactor中有三种广泛的解决方案:分组、窗口和缓冲。这三者在概念上是相近的,因为它们将通量重新分配到聚合中。分组和窗口创建通量

分组是将源通量拆分为多个批次的行为,每个批次匹配一个键。

关联的运算符是groupBy。

每个组都表示为GroupeFlux,它允许您通过调用其key()方法来检索密钥。

组的内容没有必要的连续性。一旦源元素生成一个新密钥,该密钥的组就会打开,与该密钥匹配的元素最终会在组中(可以同时打开多个组)。

这意味着群体:

Are always disjoint (a source element belongs to one and only one group).

Can contain elements from different places in the original sequence.

Are never empty.

以下示例根据值是偶数还是奇数对其进行分组:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .groupBy(i -> i % 2 == 0 ? "even" : "odd")
        .concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
                .map(String::valueOf) //map to string
                .startWith(g.key())) //start with the group's key
    )
    .expectNext("odd", "1", "3", "5", "11", "13")
    .expectNext("even", "2", "4", "6", "12")
    .verifyComplete();

警告分组最适合中低数量的组。组还必须被强制消耗(例如被平面图消耗),以便groupBy继续从上游获取数据并提供更多组。有时,这两个约束会成倍增加并导致挂起,例如当您具有高基数并且消耗组的平面图的并发过低时。使用Flux窗口

窗口化是根据大小、时间、边界定义谓词或边界定义发布者的标准将源通量拆分为窗口的行为。

相关联的运算符是windows、windowTimeout、window直到、windows同时和windows何时。

与根据传入键随机重叠的groupBy相反,窗口(大部分时间)是按顺序打开的。

不过,有些变体仍然可以重叠。例如,在window(int maxSize,int skip)中,maxSize参数是窗口关闭后的元素数,skip参数是源中打开新窗口后的元素数。所以如果maxSize

以下示例显示了重叠窗口:

StepVerifier.create(
    Flux.range(1, 10)
        .window(5, 3) //overlapping windows
        .concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
    )
        .expectNext(1, 2, 3, 4, 5)
        .expectNext(4, 5, 6, 7, 8)
        .expectNext(7, 8, 9, 10)
        .expectNext(10)
        .verifyComplete();

注:反向配置(最大尺寸

在基于谓词的窗口通过windows和windows的情况下,具有与谓词不匹配的后续源元素也可能导致空窗口,如以下示例所示:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .windowWhile(i -> i % 2 == 0)
        .concatMap(g -> g.defaultIfEmpty(-1))
    )
        .expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
        .expectNext(2, 4, 6) // triggered by 11
        .expectNext(12) // triggered by 13
        // however, no empty completion window is emitted (would contain extra matching elements)
        .verifyComplete();

用焊剂缓冲

缓冲类似于窗口,但有以下扭曲:它不发射窗口(每个窗口都是通量),而是发射缓冲区(每个窗口都是集合) — 默认情况下,列表)。

缓冲的运算符与窗口的运算符类似:buffer、bufferTimeout、bufferUntil、bufferWhile和bufferWhen。

当相应的窗口操作符打开一个窗口时,缓冲操作符创建一个新集合并开始向其中添加元素。当窗口关闭时,缓冲操作符发出收集。

缓冲也可能导致源元素丢失或缓冲区重叠,如下例所示:

StepVerifier.create(
    Flux.range(1, 10)
        .buffer(5, 3) //overlapping buffers
    )
        .expectNext(Arrays.asList(1, 2, 3, 4, 5))
        .expectNext(Arrays.asList(4, 5, 6, 7, 8))
        .expectNext(Arrays.asList(7, 8, 9, 10))
        .expectNext(Collections.singletonList(10))
        .verifyComplete();

与窗口化不同,缓冲区直到和缓冲区虽然不发出空缓冲区,如以下示例所示:

StepVerifier.create(
    Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
        .bufferWhile(i -> i % 2 == 0)
    )
    .expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
    .expectNext(Collections.singletonList(12)) // triggered by 13
    .verifyComplete();

https://github.com/reactor/reactor-core/blob/master/docs/asciidoc/advancedFeatures.adoc

 类似资料:
  • 我使用的是Netty 3.9.5,我有一个简单的客户机-服务器设置,我从http://en.wikipedia.org/wiki/Netty_(软件)#Netty\u TCP\u示例。我扩展了这个示例,将Java search plan对象从客户端发送到服务器。在这个网站上跟踪用户的帮助下,我已经能够让这个程序按预期运行。 现在,我想让我的读卡器/服务器程序同时接受多个客户端。我想我会使用下面列出

  • 我将创建一个身份验证服务器,它本身与一组不同的Oauth2.0服务器交互。Netty似乎是在这里实现网络部分的一个很好的候选者。但在开始之前,我需要澄清一些关于netty的细节,因为我是新手。例行程序如下: > < li> 服务器接受来自客户端的HTTPS连接。 然后,不关闭第一个连接,它通过HTTPS与远程OAuth2.0服务器建立另一个连接并获取数据 毕竟,服务器将结果发送回客户端,客户端应该

  • 我有一个基本的Netty服务器(来自教程)(http://netty.io/wiki/user-guide-for-4.x.html),它会接收来自客户端的请求,但如何向客户端发送字符串呢? 例如,在普通的Minecraft服务器上,您在配置文件中指定“MOTD”,当客户端从服务器列表中发出信号时,它将显示该字符串。我需要做同样的事情,但是从我的服务器代码中。

  • 我已经编写了小型Java 7客户端和服务器应用程序。我有3个自签名的X.509 RSA证书的密钥库。当客户端通过SSL连接时,服务器只发送一个证书的SSL证书消息。我对SSL/TLS有点陌生。我还研究了JSSE代码sun。安全ssl。X509KeyManagerImpl,并找到以下注释: 评论很清楚,服务器将发送单个最佳匹配证书,但我似乎不理解原因。就像在我的例子中一样,我希望服务器发送所有3个证

  • 当涉及到TCP时,Netty确实有很好的文档记录,但我想尝试一个简单的UDP服务器-客户机示例,但没有找到任何好的代码。(主要是邮件列表和据称有错误代码的用户) 有人愿意提供一些简单的例子吗?谢谢!

  • 问题内容: 我将实现类似于Facebook通知和此网站的内容(StackOverflow的通知会通知我们是否有人为我们的问题写评论/答案等)。请注意,用户将使用我的应用程序作为网站而不是移动应用程序。 我遇到以下获取结果的答案,但我需要推送结果而不是获取结果。 根据建议,我在实体类中创建了一个简单方法,并向其中添加了@PostPersist,但此方法不起作用,因此基于此答案,我添加了persist