当前位置: 首页 > 知识库问答 >
问题:

使用vertx超文本传输协议客户端时可观察ZIP运算符挂起

阮轶
2023-03-14

我所做的:

我正在使用vertx rx超文本传输协议客户端来执行大量的HTTP请求。在这种情况下,我调用“方法A”,它返回一个ID列表。要接收所有ID,我需要多次调用方法A以获取下一批结果。(每次我指定一个不同的页码我想接收)

为了提高性能并尽可能并行地进行调用,我创建了一个(RxJava)可观察项目列表,每个项目代表单个页面请求的结果。当我完成创建此列表时,我调用Obserable.zip运算符并传递可观察列表。

问题:

在没有特殊设置的情况下使用vertx http客户端,一切都可以正常工作,但速度非常慢。e、 g.在5分钟内处理3000个http请求。

我试图通过如下设置vertx http客户端选项来提高性能

 HttpClientOptions options = new HttpClientOptions();

 options.setMaxPoolSize(50)
        .setKeepAlive(true)
        .setPipelining(true)
        .setTcpKeepAlive(true)
        .setPipeliningLimit(25)
        .setMaxWaitQueueSize(10000);

但是,当我这样做时,我会得到不稳定的结果:有时一切正常,我能够在不到20秒的时间内收到所有响应。但是,有时我都调用的外部服务器会关闭连接,日志显示以下错误:

io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
  • 我的代码中没有调用错误处理程序

下面是创建HttpClientRequest的代码

public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
        Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
            try {
                HttpClientRequest request = httpClient.postAbs(url);
                addHeadersToRequest(headers, request);
                sendRequest(url, subscriber, request, body);
            }catch (Exception e) {
                try {
                    subscriber.onError(e);
                }catch (Exception ex) {
                    logger.error("error calling onError for subscriber",ex);
                }finally {
                    subscriber.onCompleted();
                }
            }
        });
        return bufferObservable;
    }

private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
        final long requestId = reqNumber.getAndIncrement();

        if (bodyData != null) {
            request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
        }

        request.putHeader("Accept-Encoding", "gzip,deflate");

        Observable<HttpRestResponse> retVal = request.toObservable()
                .doOnError(throwable -> {
                    logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
                }).doOnNext(response -> {
                    if (response != null) {
                        logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
                    }
                }).flatMap(httpClientResponse -> {
                    try {
                        if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
                            Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
                                    .reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));

                            return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
                        }
                    } catch (Exception e) {
                        logger.error("error in RestHttpClient", e);
                    }

                    return Observable.just(new HttpRestResponse(null, httpClientResponse));
                });

        retVal.subscribe(subscriber);

        if (bodyData != null) {
            request.end(bodyData); // write post data
        } else {
            request.end();
        }
    }

关于我们

共有2个答案

黄弘盛
2023-03-14

所以最后我想通了这一点。看来我传给了Observable.zip方法一个空列表...

这里的问题是onNext或onError没有在“zip”方法返回的可观察对象上被调用。在这种情况下,只调用onComplete,我没有为它创建处理程序。。。

非常感谢所有感兴趣并试图提供帮助的人。

雅尼夫

元俊雅
2023-03-14

如果您认为可以像这样连接异常逻辑

 try {
            HttpClientRequest request = httpClient.postAbs(url);
            addHeadersToRequest(headers, request);
            sendRequest(url, subscriber, request, body);
        }catch (Exception e) {
            try {
                subscriber.onError(e);
            }catch (Exception ex) {
                logger.error("error calling onError for subscriber",ex);
            }finally {
                subscriber.onCompleted();
            }
        }

您不会收到任何错误,因为您的所有处理现在基本上都驻留在Rx生态系统中,因此在您的try-catch块中不会向您报告任何内容。

这个时间点的错误会从你的

bufferObservable.onErrorReturn()

bufferObservable.subscribe(success, error)
 类似资料:
  • 我正在努力使用oauth2代理正确设置webflux WebClient。 似乎serveroauth 2 authorizedclientexchangefilterfunction使用了一个新的webclient实例,它不包含我的代理配置。 OAuth2配置 OAuth2AuthorizedClientResolver。类包含: 创建了一个新的WebClient,如下所示: 有没有人有一个如何

  • 我有一个通过HTTP服务获取数据并返回可观察对象的服务。 在第一次调用之后,我希望在服务内部缓存结果,一旦一个新组件尝试获取数据,它将从缓存的结果中获取数据。 有没有简单的解决方案?

  • 在owncloud相关网站和stackoverflow自身的以下链接中,所有相关信息都以不完整的形式呈现: 用户配置Api-Owncloud 我试着做一些非常简单的事情: > 我得到这样的输出: 开始处理凭据,首先它将存储在本地变量中 Hello Frank 您的密码是frankspassword failure 997未经授权 在owncloud中创建了一个新用户 我还尝试使用以下php脚本登录

  • 我想进行 HTTP 调用并将输出作为可观察的(这是简单的部分),然后立即进行另一个 HTTP 调用并忽略输出。 我不能使用switchMap操作符,因为第二个HTTP调用没有返回有用的东西。它只是返回“完成!”第一个调用返回我需要的复杂JSON。 我所做的,它的工作原理是订阅内部 HTTP 调用,我想知道是否有我可以使用的 RXJS 运算符: 有没有一个RxJS操作符,我可以使用它来代替再次订阅“

  • 我正在使用GWT和Spring controller来管理http流量。有些请求可能需要很长时间,但我希望在超过给定时间时终止请求。 我如何配置超时Spring。我也使用Apache Tomcat 7.0。我试图在tomcat上inrease最大线程,但有一段时间tomcat工作缓慢,因为请求线程不会死。

  • 我尝试使用以下方法从Api获取json数据 我将这个物体建模如下 数据如下: 当我尝试使用模型访问时,我收到错误“\u InternalLinkedHashMap”