首先,让我解释一下上下文:
我必须创建一个客户端,该客户端将发送许多HTTP请求以下载图像。这些请求必须是异步的,因为一旦完成图像,它将被添加到队列中,然后打印到屏幕上。由于图像可能很大且响应分块,因此我的处理程序必须将其聚合到缓冲区中。
因此,我遵循Netty示例代码(HTTP勺示例)。
目前,我有三个静态映射,用于为每个通道存储通道ID和缓冲区/块布尔值/我的最终对象。
private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>();
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>();
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>();
在那之后,我创建了我的引导程序客户端并计数器来计数未决请求的数量。当响应映像完成时,最终队列和计数器将传递给我的处理程序。
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 30000);
final CountDownLatch latch = new CountDownLatch(downloadList.size()) {
@Override
public void countDown() {
super.countDown();
if (getCount() <= 0) {
try {
queue.put(END_OF_QUEUE);
bootstrap.releaseExternalResources();
} catch (InterruptedException ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
}
}
};
bootstrap.getPipeline().addLast("codec", new HttpClientCodec());
bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch));
之后,我为要下载的每个图像创建一个通道,并在连接该通道时创建并发送请求。主机和端口之前已被提取。
for (final ImagePack pack : downloadList) {
final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture cf) throws Exception {
final Channel channel = future.getChannel();
PACK_MAP.put(channel.getId(), pack);
final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url);
request.setHeader(HttpHeaders.Names.HOST, host);
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES);
if (channel.isWritable()) {
channel.write(request);
}
}
});
}
现在,这是我的ChannelHandler,它是一个extend的内部类SimpleChannelUpstreamHandler
。连接通道后,将在in
BUFFER_MAP
和in
CHUNKS_MAP
中创建一个新条目。在BUFFER_MAP
包含从信道所使用的处理程序,以组合图像块的所有图像缓冲器和CHUNKS_MAP
包含响应分块布尔值。响应完成后,图像InputSteam
将添加到队列中,锁存器递减计数并关闭通道。
private class TileClientHandler extends SimpleChannelUpstreamHandler {
private CancellableQueue<Object> queue;
private CountDownLatch latch;
public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) {
this.queue = queue;
this.latch = latch;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
super.writeComplete(ctx, e);
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final Integer channelID = ctx.getChannel().getId();
if (!CHUNKS_MAP.get(channelID)) {
final HttpResponse response = (HttpResponse) e.getMessage();
if (response.isChunked()) {
CHUNKS_MAP.put(channelID, true);
} else {
final ChannelBuffer content = response.getContent();
if (content.readable()) {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(content);
BUFFER_MAP.put(channelID, buf);
messageCompleted(e);
}
}
} else {
final HttpChunk chunk = (HttpChunk) e.getMessage();
if (chunk.isLast()) {
CHUNKS_MAP.put(channelID, false);
messageCompleted(e);
} else {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(chunk.getContent());
BUFFER_MAP.put(channelID, buf);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
latch.countDown();
e.getChannel().close();
}
private void messageCompleted(MessageEvent e) {
final Integer channelID = e.getChannel().getId();
if (queue.isCancelled()) {
return;
}
try {
final ImagePack p = PACK_MAP.get(channelID);
final ChannelBuffer b = BUFFER_MAP.get(channelID);
p.setBuffer(new ByteArrayInputStream(b.array()));
queue.put(p.getTile());
} catch (Exception ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
latch.countDown();
e.getChannel().close();
}
}
我的问题是,当我执行此代码时,我遇到了以下异常:
java.lang.IllegalArgumentException: invalid version format: 3!}@
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
java.lang.IllegalArgumentException: invalid version format:
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format:
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread.
at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71)
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171)
at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324)
at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
有时还会出现一些NPE。
java.lang.NullPointerException
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
所有这些代码对于一个请求都可以正常工作,但是当许多请求发送时,一些奇怪的东西会附加在缓冲区上。
有什么想法我在这里想念的吗?谢谢。
在我的第一个版本中,我为每个请求的图像都复制了引导程序/处理程序,它工作正常,但优化程度不是很高。
问题是您在所有频道之间共享一个HttpClientCodec。引导程序中指定的默认管道将为所有通道克隆,因此每个通道将看到每个处理程序的相同实例。http编解码器是有状态的,因此您会看到不同响应混合在一起的效果。
最简单的解决方案是将ChannelPipelineFactory传递给引导程序。将为每个新通道调用此方法,并且您可以使用HttpClientCodec的新实例创建管道。如果这是要起作用的,那么没有什么可以阻止您对创建的每个管道使用相同的TileClientHandler实例。
我很好奇。假设您正在同时发出每个请求,那么在HttpClientCodec的上游添加HttpChunkAggregator并让Netty将所有块聚合到一个HttpResponse中会不会更容易。然后,您只是从那里获取重新组合的内容?
按照这里的Apache HttpAsyncClient示例,HTTPGET请求并不是一次性触发的,而是(大部分)同步触发的。 下图显示了请求的发送顺序(除了一个)。当增加请求数量时,这仍然是正确的。 我使用了另一个库(AsynHttpClient ),请求发送得更快,而且是随机的。 有什么办法可以改进这段代码,让它真正异步执行? 我添加了用于参考的代码。
我使用的是Netty 3.9.5,我有一个简单的客户机-服务器设置,我从http://en.wikipedia.org/wiki/Netty_(软件)#Netty\u TCP\u示例。我扩展了这个示例,将Java search plan对象从客户端发送到服务器。在这个网站上跟踪用户的帮助下,我已经能够让这个程序按预期运行。 现在,我想让我的读卡器/服务器程序同时接受多个客户端。我想我会使用下面列出
我正在构建一个tcp客户端来接收和发送消息。我按照Netty用户指南中的步骤编写了一个简单的tcp客户端,其中包含一个扩展的自定义处理程序。 在hander中,我存储了< code > ChannelHandlerContext : 然后我有一个发送方法,它使用发送消息: 我发现的另一个选项是在客户机类中使用
我需要让客户能够建立许多连接。我使用Netty 4.0。不幸的是,所有现有的示例都没有显示如何创建大量连接。 这是正确的决定吗?还是会更好?
我刚刚把我的Netty 3软件转换成5,我遇到了一个问题。只有一个客户,一切都很好。我可以使用两个解码器(它们可以正常切换)并允许我登录。 当我尝试连接另一个客户端时,无论第一个客户端是否仍然连接,都会出现问题。第一个客户端保持连接绝对正常,但第二个客户端根本无法连接(不通过第一个解码器)。 这是我的引导程序: 第一解码器: http://pastebin.com/9FWzXSVE 第二个解码器:
我有一个nxt请求帖子与表单url编码使用Feign客户端