当前位置: 首页 > 知识库问答 >
问题:

在Netty中通过ServerBootstrap ChannelPipeline发送消息时不支持的操作异常

羊舌迪
2023-03-14

我正在使用 Netty 5.0。

我有一个补充的客户端引导程序,我以安全聊天客户端为例.java来自netty github的例子。Wenn 我从客户端引导程序向服务器发送消息,它工作得很好。当我尝试将消息从服务器引导程序发送到客户端时(首先通过客户端成功启动连接/通道后),我得到一个 java.lang.不受支持的操作异常,没有任何进一步的信息。从服务器向客户端发送消息是通过上面的代码完成的。

服务器引导是否仅用于接收?

服务器引导程序不应该像上面显示的那样将消息写回客户端吗?我的意思是,消息可以通过ChannelHandler从套接字进入ChannelPipeline,但只有ChannelHandler应该将响应写回ChannelPipeline并从套接字中输出。所以在ServerBootstrap中,用户不应该能够从Pipeline外部通过ChannelPipeline发送消息。(希望这有意义)

还是我只是错过了什么?

我的代码如下:

    // Ports.
    int serverPort = 8080;

    EventLoopGroup bossGroup    = new NioEventLoopGroup();
    EventLoopGroup workerGroup  = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ch.pipeline().addLast("MyMessageHandler", new MyMessageHandler());
             }
         })
         .option(ChannelOption.SO_BACKLOG, 128)
         .childOption(ChannelOption.SO_KEEPALIVE, true);

        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(serverPort).sync();
        Channel ch = f.channel();

        System.out.println("Server: Running!");

      // Read commands from the stdin.
      ChannelFuture lastWriteFuture = null;
      BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
      while(true)
      {
          String line = in.readLine();
          if (line == null) break;

          ByteBuf getOut = buffer(64);
          getOut.writeBytes(line.getBytes());

          // Sends the received line to the server.
          lastWriteFuture = ch.writeAndFlush(getOut);

          lastWriteFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture cf) throws Exception {
                    if(cf.isSuccess()) {
                        System.out.println("CFListener: SUCCESS! YEAH! HELL! YEAH!");
                    } else {
                        System.out.println("CFListener: failure! FAILure! FAILURE!");
                        System.out.println(cf.cause());
                    }
                }
            });

      }

               // Wait until all messages are flushed before closing the channel.
      if (lastWriteFuture != null) {
          lastWriteFuture.sync();
      }


        // Wait until the server socket is closed.
        // In this example, this does not happen, but you can do that to gracefully
        // shut down your server.
        f.channel().closeFuture().sync();
    } catch (InterruptedException | UnsupportedOperationException e) {
        e.printStackTrace();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }

我开始使用下面的例子:https://github . com/netty/netty/tree/4.1/example/src/main/Java/io/netty/example/secure chat

我的问题是,调用ch.writeAndFlush时出现以下异常:

java.lang.UnsupportedOperationException
at io.netty.channel.socket.nio.NioServerSocketChannel.filterOutboundMessage(NioServerSocketChannel.java:184)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:784)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1278)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeWriteNow(ChannelHandlerInvokerUtil.java:158)
at io.netty.channel.DefaultChannelHandlerInvoker$WriteTask.run(DefaultChannelHandlerInvoker.java:440)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:328)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
at io.netty.util.internal.chmv8.ForkJoinPool.scan(ForkJoinPool.java:1706)
at io.netty.util.internal.chmv8.ForkJoinPool.runWorker(ForkJoinPool.java:1661)
at io.netty.util.internal.chmv8.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:126)

共有2个答案

汤昊
2023-03-14

我认为Netty服务器没有解码器,编码器。如果要发送字符串数据,

serverBootstrap.group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline channelPipeline = channel.pipeline();
        channelPipeline.addLast("String Encoder", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("String Decoder", new StringDecoder(CharsetUtil.UTF_8));
    }
});

添加您的服务器的初始化程序

王扬
2023-03-14

您不能写入ServerChannel,只能连接到正常通道。因此,您对writeAndFlush的调用失败。

要向每个客户机发送消息,您应该将每个客户机的通道存储在ChannelGroup中,并对其调用writeAndFlush()。

实现这一点的一个快速方法是向ServerBootstrap添加另一个处理程序,将传入的连接放入ChannelGroup中,该处理程序的快速实现如下:

// In your main:
ChannelGroup allChannels =
         new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

// In your ChannelInitializer<SocketChannel>
ch.pipeline().addLast("grouper", new GlobalSendHandler());

// New class:
public class MyHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelActive(ChannelHandlerContext ctx) {
         allChannels.add(ctx.channel());
         super.channelActive(ctx);
     }
 }

然后,我们可以调用以下函数向每个连接发送消息,这将返回一个ChannelGroupFuture,而不是一个普通的ChannelFuture:

allChannels.writeAndFlush(getOut);

通过上面的修正,您的全部代码将如下所示:

// Ports.
int serverPort = 8080;

ChannelGroup allChannels =
         new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

EventLoopGroup bossGroup    = new NioEventLoopGroup();
EventLoopGroup workerGroup  = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast("MyMessageHandler", new MyMessageHandler());
             ch.pipeline().addLast("grouper", new GlobalSendHandler());
         }
     })
     .option(ChannelOption.SO_BACKLOG, 128)
     .childOption(ChannelOption.SO_KEEPALIVE, true);

    // Bind and start to accept incoming connections.
    ChannelFuture f = b.bind(serverPort).sync();
    Channel ch = f.channel();

    System.out.println("Server: Running!");

  // Read commands from the stdin.
  ChannelGroupFuture lastWriteFuture = null;
  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  while(true)
  {
      String line = in.readLine();
      if (line == null) break;

      ByteBuf getOut = buffer(64);
      getOut.writeBytes(line.getBytes());

      // Sends the received line to the server.
      lastWriteFuture = allChannels.writeAndFlush(getOut);

      lastWriteFuture.addListener(new ChannelGroupFutureListener() {
            @Override
            public void operationComplete(ChannelGroupFuture cf) throws Exception {
                if(cf.isSuccess()) {
                    System.out.println("CFListener: SUCCESS! YEAH! HELL! YEAH!");
                } else {
                    System.out.println("CFListener: failure! FAILure! FAILURE!");
                    System.out.println(cf.cause());
                }
            }
        });

  }

           // Wait until all messages are flushed before closing the channel.
  if (lastWriteFuture != null) {
      lastWriteFuture.sync();
  }


    // Wait until the server socket is closed.
    // In this example, this does not happen, but you can do that to gracefully
    // shut down your server.
    f.channel().closeFuture().sync();
} catch (InterruptedException | UnsupportedOperationException e) {
    e.printStackTrace();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

 类似资料:
  • 相反,将引发“UnsupportedOperationException”。看起来ContainerRequest没有从修改的请求中提取UserPrincipal。 修改是通过 问题是如何将主体信息从HttpServerProbe传输到ContainerRequestFilter。request具有安全信息(在本例中是SSL客户机证书信息),而com.sun.jersey.spi.containe

  • 我正在开发一个NativeScript Android应用程序,其中我希望用户能够在按下一个按钮后打开WhatsApp联系人(只知道电话号码)。我目前使用Nativescript-open-app打开WhatsApp。是否也可以打开对话? 要打开WhatsApp,我使用以下代码(也许可以修改“com.WhatsApp”?):

  • 我想通过javascript代码发送gcm消息。为此,我们需要发布一个json对象。 gcm文档:http://developer.android.com/google/gcm/adv.html.中给出了url和json对象格式 出于测试目的,我编写了一个完美运行的Java代码。但是爪哇脚本代码不起作用。如果有人有一些示例工作代码(gcm的爪哇脚本),请发布。 脚本代码 :

  • 我不知道;我不太明白在哪里可以抛出这个异常。 例如,我正在实现

  • 下面是在strList5的subList.clear()调用中引发异常的代码: 我清除子列表的方式有问题吗?或者这是8Java的臭虫?我的Java8在Mac上的版本是:java版本"1.8.0_45"Java(TM)SE运行时环境(构建1.8.0_45-b14)JavaHotSpot(TM)64位服务器VM(构建25.45-b02,混合模式)

  • 我有一个连接到lambda的队列(fifo)。我想在lambda中向标准队列发送一条消息。但没有发送/接收任何消息。然而,如果我尝试从非SQS连接的lambda(通过AppSync)发送它,它会工作。 我查过: lambda有权发送SQS消息(您可以在那里看到) 由于我已成功地从另一个lambda(非SQS)向标准队列发送消息,因此正确配置了标准队列 SQS URL是否正确 控制台中不会显示任何错