当前位置: 首页 > 知识库问答 >
问题:

使响应顺序与Netty中的请求顺序相匹配

曾沛
2023-03-14

我正在编写一个Netty应用程序(Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个EchoHandler,它用原始消息响应,尽管有时会在短时间延迟后响应:

public class EchoHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    var message = (String) msg;

    if (message.equals("delay")) {
      ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
    } else {
      ctx.writeAndFlush(message);
    }
  }
}

使用此代码,不能保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”和其他字符串的第二条组成,则响应的顺序将颠倒!我写了一个测试来说明这一点:

public class Tests {
  @Test
  public void test() throws ExecutionException, InterruptedException {
    var channel = new EmbeddedChannel(new EchoHandler());
    channel.writeInbound("delay", "second message");

    // Let some time pass and process any scheduled tasks
    Thread.sleep(500);
    channel.runPendingTasks();

    // The response to the last message comes first!
    var firstMessage = (String) channel.readOutbound();
    Assertions.assertEquals("second message", firstMessage);

    // The response to the first message comes second!
    var secondMessage = (String) channel.readOutbound();
    Assertions.assertEquals("delay", secondMessage);
  }
}

我正在寻找Netty中的内置方式,以确保传出响应的顺序与传入消息的顺序相匹配。使用库的虚构扩展,我将按如下方式重写处理程序:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
  var message = (String) msg;

  ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
  
  if (message.equals("delay")) {
    ctx.executor().schedule(() -> {
      ctx.writeAndFlush(message);
      ctx.resumeMessageProcessing(); // After flushing we can resume message processing
    }, 400, TimeUnit.MILLISECONDS);
  } else {
    ctx.writeAndFlush(message);
    ctx.resumeMessageProcessing(); // After flushing we can resume message processing
  }
}

Netty是否提供开箱即用的此类内容?这听起来像是一个常见的用例,我宁愿依靠经过实战检验的代码,也不愿编写自己的队列实现。

共有1个答案

汝和裕
2023-03-14

虽然不是内置的,但此问题的一个看似惯用的解决方案是自定义MessageToMessageCodec。基于使用String作为消息类型的示例,我编写了以下为您处理排队的编解码器:

public class QueuingCodec extends MessageToMessageCodec<String, String> {

  private final Queue<String> messageQueue = new ArrayDeque<>();
  private boolean processingMessage = false;

  @Override
  protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception {

    // Pass the message on to output
    list.add(s);

    // Allow processing more messages
    processingMessage = false;

    // Send the next message in the queue if available
    if (!messageQueue.isEmpty()) {
      processingMessage = true;
      var message = messageQueue.poll();

      // Pass the message on to input
      ctx.executor().execute(() -> {
        ctx.fireChannelRead(message);
      });
    }
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception {

    if (processingMessage) {
      // Store message for later processing
      messageQueue.add(s);
    } else {
      // Pass data on to input
      list.add(s);

      // Prevent processing of new messages
      processingMessage = true;
    }
  }
}
 类似资料:
  • 我试图以一种干净的方式构建应用程序的体系结构。我想我可以在Netty中做到这一点,因为它是一个著名的java网络框架选项。 我有连接到Netty服务器的设备(通过GPRS的TCP)。假设它们都是永久连接的(保持存活),有几种情况我需要用这个架构来描述: 情况1:设备可以向Netty发送消息,并且Netty响应该消息 情况2:设备可以向Netty发送消息,并且Netty对该消息做出响应,但是Nett

  • 我有一个for循环,这是创建不同的网址,然后我推动的网址在Volley请求队列使用添加功能。我有一个问题,将响应侦听器将响应的请求添加到请求队列的顺序或它将是随机的基于服务器响应的请求?我使用VolleySingleton方法。

  • 我使用reform连接到一个API,该API在每个响应中包含一个惟一的令牌。这个令牌必须包含在下一个请求中。这意味着我需要在发出下一个请求之前等待请求的响应。 在改造中是否有内置机制来实现这一目标?如果不是,推荐的方法是什么?请注意,我使用的是异步改造方法和版本 1.9。拦截器用于读取令牌并自动将其添加到下一个请求中,这非常有效。当两个请求非常接近时,就会出现问题,因此第二个请求最终使用过时的令牌

  • 假设我在1..n个VertX(V)实例前面有一个负载均衡器(LB),每个VertX实例都连接到一个队列(Q),我有1..m个后端(BE)。 用户点击一个按钮发出post请求,甚至打开一个web套接字,负载均衡器将请求转发给VertX实例之一,VertX实例向队列发出请求,一个后端使用该消息并返回响应;如果正确的VertX实例使用了它,它可以查找响应处理程序并向用户写入响应;如果错误的VertX实例

  • 问题内容: 显然,请求参数的顺序未保留在Tomcat Servlet容器中(对于Google,似乎其他容器也存在相同的问题)。 这似乎是非常错误的。 如何从HttpServletRequest恢复请求参数的实际顺序? *对于那些认为请求参数顺序无关紧要的人来说, *编辑 BTW之间是有区别的: 和 因此,顺序对于重复的请求参数确实很重要(幸运的是Java确实处理了此权利)。 问题答案: 通常,您不

  • 问题内容: 我收到来自JSON字符串服务器的很大响应。我将其转换为JSON对象,然后获取密钥并对其进行迭代。 问题是,当我进行迭代时,它的顺序与服务器的响应顺序相同。 接下来,我通过添加所有键并对其进行排序来应用另一种方法,然后获得该方法的迭代器,但仍然不是我所需要的(作为响应)。 代码示例在这里: 问题答案: JSON对象的键顺序不应该有意义。如果要特定顺序,则应使用数组,而不是对象。 您的Ja