我所做的:
我正在使用vertx rx超文本传输协议客户端来执行大量的HTTP请求。在这种情况下,我调用“方法A”,它返回一个ID列表。要接收所有ID,我需要多次调用方法A以获取下一批结果。(每次我指定一个不同的页码我想接收)
为了提高性能并尽可能并行地进行调用,我创建了一个(RxJava)可观察项目列表,每个项目代表单个页面请求的结果。当我完成创建此列表时,我调用Obserable.zip运算符并传递可观察列表。
问题:
在没有特殊设置的情况下使用vertx http客户端,一切都可以正常工作,但速度非常慢。e、 g.在5分钟内处理3000个http请求。
我试图通过如下设置vertx http客户端选项来提高性能:
HttpClientOptions options = new HttpClientOptions();
options.setMaxPoolSize(50)
.setKeepAlive(true)
.setPipelining(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(25)
.setMaxWaitQueueSize(10000);
但是,当我这样做时,我会得到不稳定的结果:有时一切正常,我能够在不到20秒的时间内收到所有响应。但是,有时我都调用的外部服务器会关闭连接,日志显示以下错误:
io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
下面是创建HttpClientRequest的代码
public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
});
return bufferObservable;
}
private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
final long requestId = reqNumber.getAndIncrement();
if (bodyData != null) {
request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
}
request.putHeader("Accept-Encoding", "gzip,deflate");
Observable<HttpRestResponse> retVal = request.toObservable()
.doOnError(throwable -> {
logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
}).doOnNext(response -> {
if (response != null) {
logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
}
}).flatMap(httpClientResponse -> {
try {
if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
.reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));
return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
}
} catch (Exception e) {
logger.error("error in RestHttpClient", e);
}
return Observable.just(new HttpRestResponse(null, httpClientResponse));
});
retVal.subscribe(subscriber);
if (bodyData != null) {
request.end(bodyData); // write post data
} else {
request.end();
}
}
关于我们
所以最后我想通了这一点。看来我传给了Observable.zip方法一个空列表...
这里的问题是onNext或onError没有在“zip”方法返回的可观察对象上被调用。在这种情况下,只调用onComplete,我没有为它创建处理程序。。。
非常感谢所有感兴趣并试图提供帮助的人。
雅尼夫
如果您认为可以像这样连接异常逻辑
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
您不会收到任何错误,因为您的所有处理现在基本上都驻留在Rx生态系统中,因此在您的try-catch块中不会向您报告任何内容。
这个时间点的错误会从你的
bufferObservable.onErrorReturn()
或
bufferObservable.subscribe(success, error)
我正在努力使用oauth2代理正确设置webflux WebClient。 似乎serveroauth 2 authorizedclientexchangefilterfunction使用了一个新的webclient实例,它不包含我的代理配置。 OAuth2配置 OAuth2AuthorizedClientResolver。类包含: 创建了一个新的WebClient,如下所示: 有没有人有一个如何
我有一个通过HTTP服务获取数据并返回可观察对象的服务。 在第一次调用之后,我希望在服务内部缓存结果,一旦一个新组件尝试获取数据,它将从缓存的结果中获取数据。 有没有简单的解决方案?
在owncloud相关网站和stackoverflow自身的以下链接中,所有相关信息都以不完整的形式呈现: 用户配置Api-Owncloud 我试着做一些非常简单的事情: > 我得到这样的输出: 开始处理凭据,首先它将存储在本地变量中 Hello Frank 您的密码是frankspassword failure 997未经授权 在owncloud中创建了一个新用户 我还尝试使用以下php脚本登录
我想进行 HTTP 调用并将输出作为可观察的(这是简单的部分),然后立即进行另一个 HTTP 调用并忽略输出。 我不能使用switchMap操作符,因为第二个HTTP调用没有返回有用的东西。它只是返回“完成!”第一个调用返回我需要的复杂JSON。 我所做的,它的工作原理是订阅内部 HTTP 调用,我想知道是否有我可以使用的 RXJS 运算符: 有没有一个RxJS操作符,我可以使用它来代替再次订阅“
我正在使用GWT和Spring controller来管理http流量。有些请求可能需要很长时间,但我希望在超过给定时间时终止请求。 我如何配置超时Spring。我也使用Apache Tomcat 7.0。我试图在tomcat上inrease最大线程,但有一段时间tomcat工作缓慢,因为请求线程不会死。
我有这个问题: 我需要从服务器下载一个zip文件,这个zip包含。xod和。巴布亚新几内亚。我正在使用AngularJS,JavaSpring,来克服跨域问题,我的htt。获取java spring控制器的角度调用,它将进行真正的get调用。我必须下载zip- 这是我的角度代码: 这是java spring代码: 这是控制台。日志: Object{data:"UEsDBBQACAAIAA5Ipkg