当前位置: 首页 > 知识库问答 >
问题:

无法使用udp套接字在netty中回复

司徒炎彬
2023-03-14

以前的代码是用c语言编写的,使用的是这样的代码:

sockHndl = socket(AF_INET,SOCK_DGRAM,0);
sockConf.sin_family = AF_INET;
sockConf.sin_addr.s_addr = INADDR_ANY;
sockConf.sin_port = htons(33333);

bind(sockHndl, (struct sockaddr*)&sockConf, sizeof(sockConf));

// in a thread
recvfrom(sockHndl, buffer, 1515, 0, (struct sockaddr*)&sockConf, &s);

// process any recevied data
....

// reply
sendto(sockHndl, buffer, strlen(buffer), 0, (struct sockaddr*)&sockConf, sizeof(sockConf));

这段代码可以工作,但我必须使用Netty在java中重新实现它。

在tcp中使用netty不是问题,但是用netty回复客户机对我来说是个问题,如果我想在通道上写,它失败了,出现了java.nio.channels.notyetConnectedException

也许这与netty上的这个问题有关,但我不知道如何在代码中再现c行为。

final Bootstrap b = new Bootstrap();

b.group(comIOEventLoop).channel(NioDatagramChannel.class).handler(new ChannelInitializer<NioDatagramChannel>()
{
    protected void initChannel(NioDatagramChannel ch) throws Exception
    {
        final ChannelPipeline p = ch.pipeline();
        p.addLast("SITCDecoder", new SITCDecoder());
        p.addLast("SITCEncoder", new SITCEncoder());
        p.addLast("SITCHandler", new SimpleChannelInboundHandler<ITCUdpRequest>()
        {
            protected void channelRead0(ChannelHandlerContext ctx, ITCUdpRequest msg) throws Exception
            {
                // here ctx.channel().remoteAddress() == null !
                switch(msg.getType())
                {
                    case GET_PARAMS_0:
                        send(ctx.channel(), new ITCUdpResponse(-80, -20));
                        break;
                }
            }

            public void send(final Channel channel, final ITCUdpResponse data)
            {
                if(data == null)
                {
                    LOGGER.error("data == null !!!");
                }

                channel.writeAndFlush(data).addListener(new GenericFutureListener<Future<Void>>()
                {
                    public void operationComplete(final Future<Void> future) throws Exception
                    {
                        if(future.isDone() && future.isSuccess())
                        {
                            LOGGER.debug("OK");
                        }
                        else
                        {
                            LOGGER.error("error " + future.isDone() + " - " + future.isSuccess());
                            if(!future.isSuccess())
                            {
                                future.cause().printStackTrace();
                            }
                        }
                    }
                });
            }
        });
    }
});

channel = b.bind(port).sync().channel();
channel.closeFuture().await();

如何回复客户?

多谢了。

    public class SITCDecoder extends SimpleChannelInboundHandler<DatagramPacket>
{
    private static final Logger LOGGER              = LoggerFactory.getLogger(SITCDecoder.class);

    private static final String MSG_DD_HEADER       = "SITCDDVAL:";
    private static final int    MSG_DD_HEADER_SIZE  = MSG_DD_HEADER.length();

    protected void channelRead0(final ChannelHandlerContext ctx, final DatagramPacket msg) throws Exception
    {
        final ByteBuf data = (ByteBuf) msg.content();

        if(data != null)
        {
            // we must be able to read at last <code>MSG_HEADER_SIZE</code> Bytes
            if(data.readableBytes() < MSG_DD_HEADER_SIZE)
            {
                LOGGER.error("Not enought data");
                return;
            }

            if(!data.readBytes(MSG_DD_HEADER_SIZE).toString(StandardCharsets.ISO_8859_1).equals(MSG_DD_HEADER))
            {
                LOGGER.error("Header not found");
            }

            final String payload = data.readBytes(data.readableBytes()).toString(StandardCharsets.ISO_8859_1);

            final RecyclableArrayList out = RecyclableArrayList.newInstance();
            out.add(new ITCUdpRequest(payload));

            final int size = out.size();

            for(int i = 0; i < size; i++)
            {
                ctx.fireChannelRead(out.get(i));
            }
            out.recycle();
        }
        else
        {
            ctx.fireChannelRead(msg);
        }
    }
}

编码器代码:

public class SITCEncoder extends MessageToByteEncoder<ITCUdpResponse>
{
    private static final String MSG_RP_HEADER   = "SITCRPVAL:";
    private static final char   MSG_RP_FOOTER   = '*';
    private static final char   VAL_PREFIX      = '[';
    private static final char   VAL_SUFFIX      = ']';

    protected void encode(final ChannelHandlerContext ctx, final ITCUdpResponse msg, final ByteBuf out) throws Exception
    {
        final StringBuilder str = new StringBuilder();
        str.append(MSG_RP_HEADER);

        for(final Object val : msg.getParams())
        {
            str.append(VAL_PREFIX).append(val).append(VAL_SUFFIX);
        }

        str.append(MSG_RP_FOOTER);

        out.ensureWritable(str.length());

        out.writeBytes(str.toString().getBytes());
    }
}

共有1个答案

司徒志强
2023-03-14

好的,在阅读了stackoverflow针对我的问题给出的建议之后,我已经从NIO*切换到OIO*(就像这里说的那样),并且一切都按照预期工作,没有做任何改变。

桑斯克。

PS:为什么Oio在Nio不工作的地方工作?

 类似资料:
  • 根据手册页: send()和write(2)之间的唯一区别是标志的存在。使用零标志参数,send()等同于write(2)。另外,下面的调用send(sockfd,buf,len,flags);相当于sendto(sockfd,buf,len,flags,NULL,0); 所以: 这是否意味着在UDP套接字上使用完全可以(如果我不需要)? 是否有方法在UDP套接字上使用(就像现在我在的参数中设置目

  • 我已经在Minikube上部署了一个UDP套接字服务器。套接字服务绑定到端口2152。下面是description pod命令中的IP片段。 我的客户机与minikube运行在同一个VM上,无法与服务器通信。我在客户端使用服务器地址IP 172.17.0.3,端口为2152。我还尝试使用minikube IP 192.168.49.2将UDP数据从客户端发送到服务器。 请帮忙,这里有什么问题。

  • 问题内容: 我正在寻找一种使用Go语言与UDP套接字进行客户端/服务器通信的好的解决方案。 我在Internet上找到的示例向我展示了如何将数据发送到服务器,但是没有讲授如何将数据发送回客户端。 为了演示,我的程序执行以下操作: 我的客户端程序在4444端口上创建一个套接字,如下所示: 我向服务器发送了字符串和本地地址,因此它可以打印字符串并发送OK消息。我为此使用gob: 我的数据结构如下所示:

  • 我想广播消息本地到许多应用程序。对于这一点,我认为UDP套接字是最好的IPC,纠正我,如果我是Worwn。 并倾听: 问题是我必须像这样通过IP192.168.1.255,但在实际场景中可能没有eth0接口,只有环回。那我怎么才能做到这一点呢?

  • 我正在尝试使用UDP在Android上创建一个unix域套接字服务器和客户端。我需要客户端向服务器发送一条消息(“hi”),然后从服务器上向客户端发送数据。我已经成功地在两边创建了套接字,并且我能够在服务器上从客户端接收到一条短消息。然而,服务器上的recvfrom(…)没有填充结构体套接字*src_addr,socklen_t*addrlen参数。使用这些src_addr和addrlen的后续s

  • 为了实现这一点,我使用了队列/线程池机制。最初,我创建一个固定数量线程的池,并有一个队列datastructure来存储客户机地址。这个队列在所有线程之间共享,因此我使用“互斥”来锁定/解锁这个队列。在主服务器线程中,我创建一个套接字,将其绑定到全局端口/地址,然后在“recvfrom”调用上阻止服务器。任何希望与服务器通信的客户端都会向侦听全局端口地址的主线程服务器发送“HI”消息。主服务器线程