本文总结讲解httpasyncclient使用过程中遇到的各种异常(java.net.ConnectException、java.net.SocketTimeoutException、java.util.concurrent.TimeoutException: Connection lease request time out),原因、分析、解决方方案及思考。
httpasyncclient 在负载高场景下经常遇到以下报错:接下来我们进项以下综合分析解决
java.util.concurrent.TimeoutException: Connection lease request time out
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:411)
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:391)
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:355)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:391)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.discardConnection(AbstractClientExchangeHandler.java:276)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:428)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run(Thread.java:852)收起
首先解释一下 Connection lease request time out,字面意思为连接契约请求超时,即为getConnectionRequestTimeout 获取连接超时。
异常栈:
java.net.ConnectException: Timeout connecting to [/120.77.77.52:80]
at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:169)
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:632)
at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:898)
at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:198)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:213)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:158)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.lang.Thread.run(Thread.java:748)
分析:
连接超时,建立http连接超时。通常发生原因与服务器抖动,服务方系统性能关联,解决方案亦为排查以上两个问题。
异常栈:
java.net.SocketTimeoutException: XXX milliseconds timeout on connection http-outgoing-133 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run(Thread.java:748)
分析:
通信超时。通常发生原因与服务器抖动,服务方系统性能关联,解决方案亦为排查以上两个问题。当然也有例外对于java应用本机的问题也容易导致这个异常,后续我会专门写一篇文章来介绍怎样排查,欢迎大家关注。
异常栈:
java.util.concurrent.TimeoutException: Connection lease request time out
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:411)
at org.apache.http.nio.pool.AbstractNIOConnPool.lease(AbstractNIOConnPool.java:280)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.requestConnection(PoolingNHttpClientConnectionManager.java:295)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.requestConnection(AbstractClientExchangeHandler.java:377)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.start(DefaultClientExchangeHandlerImpl.java:129)
at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:141)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(CloseableHttpAsyncClient.java:75)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(CloseableHttpAsyncClient.java:108)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(CloseableHttpAsyncClient.java:92)
at com.amap.aos.http.client.core.support.DefaultHttpClient.asyncExecute(DefaultHttpClient.java:650)
at com.amap.aos.http.client.core.support.DefaultHttpClient.post(DefaultHttpClient.java:615)
at com.amap.aos.http.client.core.support.DefaultHttpClient.post(DefaultHttpClient.java:638)
分析:
Connection lease request time out是一个比较复杂且难解决的异常,本文是由该异常的排查解决及实验产生的,如背景中介绍Connection lease request time out,字面意思为连接契约请求超时,即为getConnectionRequestTimeout 获取连接超时。lease request time 的计时从请求开始请求连接开始,因此只要连接池满,排队时间也计入lease request time。所以这个异常通常是本地http连接负载高时发生。那么解决这个问题就与在http连接高时进行阻塞式单机限流,提供背压反馈关联。
后续我也会专门下一篇文章介绍我的解决方案,欢迎大家关注。
获取连接过程源码解析:
CloseableHttpAsyncClient
/**
* Base implementation of {@link HttpAsyncClient} that also implements {@link Closeable}.
*
* @since 4.0
*/
@Contract(threading = ThreadingBehavior.SAFE)
public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Closeable {
...
@Override
public Future<HttpResponse> execute(
final HttpHost target, final HttpRequest request, final HttpContext context,
final FutureCallback<HttpResponse> callback) {
return execute(
HttpAsyncMethods.create(target, request),
HttpAsyncMethods.createConsumer(),
context, callback);
}
@Override
public Future<HttpResponse> execute(
final HttpHost target, final HttpRequest request,
final FutureCallback<HttpResponse> callback) {
return execute(target, request, HttpClientContext.create(), callback);
}
@Override
public Future<HttpResponse> execute(
final HttpUriRequest request,
final FutureCallback<HttpResponse> callback) {
return execute(request, HttpClientContext.create(), callback);
}
//执行入口
@Override
public Future<HttpResponse> execute(
final HttpUriRequest request,
final HttpContext context,
final FutureCallback<HttpResponse> callback) {
final HttpHost target;
try {
//组装host
target = determineTarget(request);
} catch (final ClientProtocolException ex) {
final BasicFuture<HttpResponse> future = new BasicFuture<HttpResponse>(callback);
future.failed(ex);
return future;
}
// 执行
return execute(target, request, context, callback);
}
...
}
InternalHttpAsyncClient
class InternalHttpAsyncClient extends CloseableHttpAsyncClientBase {
@Override
public <T> Future<T> execute(
final HttpAsyncRequestProducer requestProducer,
final HttpAsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
ensureRunning();
final BasicFuture<T> future = new BasicFuture<T>(callback);
final HttpClientContext localcontext = HttpClientContext.adapt(
context != null ? context : new BasicHttpContext());
setupContext(localcontext);
@SuppressWarnings("resource")
final DefaultClientExchangeHandlerImpl<T> handler = new DefaultClientExchangeHandlerImpl<T>(
this.log,
requestProducer,
responseConsumer,
localcontext,
future,
this.connmgr,
this.connReuseStrategy,
this.keepaliveStrategy,
this.exec);
try {
// 准备连接 获取连接
handler.start();
} catch (final Exception ex) {
handler.failed(ex);
}
return new FutureWrapper<T>(future, handler);
}
}
DefaultClientExchangeHandlerImpl
/**
* Default implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler}.
* <p>
* Instances of this class are expected to be accessed by one thread at a time only.
* The {@link #cancel()} method can be called concurrently by multiple threads.
*/
class DefaultClientExchangeHandlerImpl<T> extends AbstractClientExchangeHandler {
...
public void start() throws HttpException, IOException {
final HttpHost target = this.requestProducer.getTarget();
final HttpRequest original = this.requestProducer.generateRequest();
if (original instanceof HttpExecutionAware) {
((HttpExecutionAware) original).setCancellable(this);
}
this.exec.prepare(target, original, this.state, this);
// 请求连接
requestConnection();
}
...
final void requestConnection() {
final HttpRoute route = this.routeRef.get();
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.id + "] Request connection for " + route);
}
discardConnection();
this.validDurationRef.set(null);
this.routeTrackerRef.set(null);
this.routeEstablished.set(false);
final Object userToken = this.localContext.getUserToken();
final RequestConfig config = this.localContext.getRequestConfig();
// 请求连接
this.connectionFutureRef.set(this.connmgr.requestConnection(
route,
userToken,
config.getConnectTimeout(),
config.getConnectionRequestTimeout(),
TimeUnit.MILLISECONDS,
new FutureCallback<NHttpClientConnection>() {
@Override
public void completed(final NHttpClientConnection managedConn) {
connectionAllocated(managedConn);
}
@Override
public void failed(final Exception ex) {
connectionRequestFailed(ex);
}
@Override
public void cancelled() {
connectionRequestCancelled();
}
}));
}
}
PoolingNHttpClientConnectionManager
/**
* {@code PoolingNHttpClientConnectionManager} maintains a pool of
* {@link NHttpClientConnection}s and is able to service connection requests
* from multiple execution threads. Connections are pooled on a per route
* basis. A request for a route which already the manager has persistent
* connections for available in the pool will be services by leasing
* a connection from the pool rather than creating a brand new connection.
* <p>
* {@code PoolingNHttpClientConnectionManager} maintains a maximum limit
* of connection on a per route basis and in total. Per default this
* implementation will create no more than than 2 concurrent connections
* per given route and no more 20 connections in total. For many real-world
* applications these limits may prove too constraining, especially if they
* use HTTP as a transport protocol for their services. Connection limits,
* however, can be adjusted using {@link ConnPoolControl} methods.
*
* @since 4.0
*/
@Contract(threading = ThreadingBehavior.SAFE)
public class PoolingNHttpClientConnectionManager
implements NHttpClientConnectionManager, ConnPoolControl<HttpRoute> {
@Override
public Future<NHttpClientConnection> requestConnection(
final HttpRoute route,
final Object state,
final long connectTimeout,
final long leaseTimeout,
final TimeUnit tunit,
final FutureCallback<NHttpClientConnection> callback) {
Args.notNull(route, "HTTP route");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + format(route, state) + formatStats(route));
}
final BasicFuture<NHttpClientConnection> resultFuture = new BasicFuture<NHttpClientConnection>(callback);
final HttpHost host;
if (route.getProxyHost() != null) {
host = route.getProxyHost();
} else {
host = route.getTargetHost();
}
final SchemeIOSessionStrategy sf = this.iosessionFactoryRegistry.lookup(
host.getSchemeName());
if (sf == null) {
resultFuture.failed(new UnsupportedSchemeException(host.getSchemeName() +
" protocol is not supported"));
return resultFuture;
}
// 获取连接
final Future<CPoolEntry> leaseFuture = this.pool.lease(route, state,
connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS,
new FutureCallback<CPoolEntry>() {
@Override
public void completed(final CPoolEntry entry) {
Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
if (log.isDebugEnabled()) {
log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
}
final NHttpClientConnection managedConn = CPoolProxy.newProxy(entry);
if (!resultFuture.completed(managedConn)) {
pool.release(entry, true);
}
}
@Override
public void failed(final Exception ex) {
if (log.isDebugEnabled()) {
log.debug("Connection request failed", ex);
}
resultFuture.failed(ex);
}
@Override
public void cancelled() {
log.debug("Connection request cancelled");
resultFuture.cancel(true);
}
});
return new Future<NHttpClientConnection>() {
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
try {
leaseFuture.cancel(mayInterruptIfRunning);
} finally {
return resultFuture.cancel(mayInterruptIfRunning);
}
}
@Override
public boolean isCancelled() {
return resultFuture.isCancelled();
}
@Override
public boolean isDone() {
return resultFuture.isDone();
}
@Override
public NHttpClientConnection get() throws InterruptedException, ExecutionException {
return resultFuture.get();
}
@Override
public NHttpClientConnection get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return resultFuture.get(timeout, unit);
}
};
}
}
AbstractNIOConnPool
/**
* Abstract non-blocking connection pool.
*
* @param <T> route
* @param <C> connection object
* @param <E> pool entry
*
* @since 4.2
*/
@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T> {
/**
* @since 4.3
*/
public Future<E> lease(
final T route, final Object state,
final long connectTimeout, final long leaseTimeout, final TimeUnit timeUnit,
final FutureCallback<E> callback) {
Args.notNull(route, "Route");
Args.notNull(timeUnit, "Time unit");
Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
final BasicFuture<E> future = new BasicFuture<E>(callback);
final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state,
connectTimeout >= 0 ? timeUnit.toMillis(connectTimeout) : -1,
leaseTimeout > 0 ? timeUnit.toMillis(leaseTimeout) : 0,
future);
this.lock.lock();
try {
// 请求连接
final boolean completed = processPendingRequest(leaseRequest);
if (!leaseRequest.isDone() && !completed) {
// 请求排队
this.leasingRequests.add(leaseRequest);
}
if (leaseRequest.isDone()) {
this.completedRequests.add(leaseRequest);
}
} finally {
this.lock.unlock();
}
fireCallbacks();
return new Future<E>() {
@Override
public E get() throws InterruptedException, ExecutionException {
return future.get();
}
@Override
public E get(
final long timeout,
final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
try {
leaseRequest.cancel();
} finally {
return future.cancel(mayInterruptIfRunning);
}
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
@Override
public boolean isDone() {
return future.isDone();
}
};
}
private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
final T route = request.getRoute();
final Object state = request.getState();
final long deadline = request.getDeadline();
final long now = System.currentTimeMillis();
// 判断获取连接是否超时
if (now > deadline) {
request.failed(new TimeoutException("Connection lease request time out"));
return false;
}
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
// 连接池有空暇连接直接返回
entry = pool.getFree(state);
if (entry == null) {
break;
}
if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
entry.close();
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
request.completed(entry);
onReuse(entry);
onLease(entry);
return true;
}
// New connection is needed
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.pending.size() + this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity == 0) {
return false;
}
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
final SocketAddress localAddress;
final SocketAddress remoteAddress;
try {
remoteAddress = this.addressResolver.resolveRemoteAddress(route);
localAddress = this.addressResolver.resolveLocalAddress(route);
} catch (final IOException ex) {
request.failed(ex);
return false;
}
// 建立连接
final SessionRequest sessionRequest = this.ioReactor.connect(
remoteAddress, localAddress, route, this.sessionRequestCallback);
request.attachSessionRequest(sessionRequest);
final long connectTimeout = request.getConnectTimeout();
if (connectTimeout >= 0) {
sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE);
}
this.pending.add(sessionRequest);
pool.addPending(sessionRequest, request.getFuture());
return true;
}
// 返回获取连接失败
return false;
}
}