当前位置: 首页 > 知识库问答 >
问题:

Java,SocketChannel选择器,将写入通道与阻塞队列相结合

祖波光
2023-03-14

我目前正在尝试从一个线程操作一个SocketChannel(我以前用两个线程和常规套接字实现了我想要做的事情,但每个客户端两个线程似乎有点过分)。我希望能够在有数据要读取时读取(选择器可以很好地工作)。我只想在阻塞队列(在我的示例中,我有帧队列)中有项目时写入。

        @Override
        public void run() {
            super.run();

            SelectionKey readKey = null;
            try {
                final int interests = SelectionKey.OP_READ;
                socketChannel.configureBlocking(false);
                readKey = socketChannel.register(selector, interests);
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    close();
                } catch (Exception e1) {
                    throw new RuntimeException("FAILURE");
                }
            }

            if (null != readKey) {
                while (running) {
                    try {
                        System.out.println("LOOP ENTRY");
                        selector.select();

                        if (readKey.isReadable()) {
                            System.out.println("IS READABLE");
                        }

                        if (readKey.isWritable() && (null != framesQueues.peek())) {
                            System.out.println("IS WRITEABLE");
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

现在,它做的是,它不停地疯狂地循环,这显然很糟糕。我正在寻找一种方法,当我的阻塞队列中有一个项目,或者有字节要读取时,我可以唤醒选择器。NIO中是否有允许这样做的工具?

如果没有,我可以实施什么?我注定要在每个客户端使用两个线程吗?这是一个爱好项目,但我正试图实现尽可能低的延迟,所以循环睡眠不是我想要的。

共有1个答案

程瑞
2023-03-14

我在玩nio sockets,我把一些东西放在了一起,希望它们足够容易理解。你需要做的就是telnet localhost 5050。我无法访问你的其他代码,所以我不知道你缺少了什么。不过,我假设您没有从选择器中清除所选的键,或者可能只有在完成编写后才将兴趣操作更改为(读取)。

public static void main(String... args) throws IOException {
    final Selector selector = Selector.open();

    //every 10 seconds this thread will go through all the connections and
    //send "(x times) (date) to every client
    new Thread() {
        public void run() {
            for (int i = 0; selector.isOpen(); i++) {
                for (SelectionKey key : selector.keys()) {
                    if (key.channel() instanceof SocketChannel) {
                        ((Queue<ByteBuffer>) key.attachment()).add(ByteBuffer.wrap((i + " - " + new Date() + "\n").getBytes()));
                        key.interestOps(OP_READ | OP_WRITE); //enable write flag
                    }
                }

                selector.wakeup(); //wakeup so it can get to work and begin writing
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                }
            }
        }
    }.start();


    //create server on port 5050
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    server.bind(new InetSocketAddress(5050));
    server.register(selector, OP_ACCEPT);

    //reusable buffer
    final ByteBuffer readBuffer = ByteBuffer.allocate(0x1000);

    while (selector.isOpen()) {
        int selected = selector.select();
        System.out.println("Selected " + selected + (selected == 1 ? " key." : " keys."));
        if (selected > 0) {
            for (SelectionKey key : selector.selectedKeys()) {
                if (key.isValid() && key.isReadable()) {
                    System.out.println("Readable: " + key.channel());
                    SocketChannel socket = ((SocketChannel) key.channel());
                    readBuffer.clear();
                    int read = socket.read(readBuffer);
                    if (read == -1) {
                        System.out.println("Socket Closed " + key.channel());
                        socket.close();
                        continue; //socket is closed. continue loop
                    }

                    //we will add what the client sent to the queue to echo it back
                    if (read > 0) {
                        readBuffer.flip();
                        ByteBuffer buffer = ByteBuffer.allocate(readBuffer.remaining());
                        ((Queue<Buffer>) key.attachment()).add(buffer.put(readBuffer).flip());
                        key.interestOps(OP_WRITE | OP_READ); //enable write flag
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    System.out.println("Writable: " + key.channel());
                    SocketChannel socket = (SocketChannel) key.channel();

                    //retrieve attachment(ArrayBlockingQueue<ByteBuffer>)
                    Queue<Buffer> dataToWrite = (Queue<Buffer>) key.attachment();

                    //only remove from queue once we have completely written
                    //this is why we call peek first, and only remove once (buffer.remaining() == 0)
                    for (ByteBuffer buffer; (buffer = (ByteBuffer) dataToWrite.peek()) != null;) {
                        socket.write(buffer);
                        if (buffer.remaining() == 0) dataToWrite.remove();
                        else break; //can not write anymore. Wait for next write event
                    }

                    //once queue is empty we need to stop watching for write events
                    if (dataToWrite.isEmpty()) key.interestOps(OP_READ);
                }

                if (key.isValid() && key.isAcceptable()) {
                    System.out.println("Acceptable: " + key.channel());
                    SocketChannel socket = ((ServerSocketChannel) key.channel()).accept();
                    socket.configureBlocking(false);

                    //add a ArrayBlockingQueue<ByteBuffer> as an attachment for the socket
                    socket.register(selector, OP_READ, new ArrayBlockingQueue<ByteBuffer>(1000));
                }
            }
            selector.selectedKeys().clear(); //must clear all when finished or loop will continue selecting nothing
        }
    }
}
 类似资料:
  • 问题内容: Java NIO提供并可以将其设置为非阻塞模式(异步)。大多数操作返回的值对应于成功或该操作尚未完成。是什么目的,并随后,除了回调的功能? 问题答案: 可以设置为非阻塞模式(异步) 就在那里,您有误解。非阻塞模式 不同于 异步模式。 非阻塞操作要么传输数据,要么不传输数据。无论哪种情况,都没有阻塞,并且返回操作完成。此模式受 和支持 异步操作在您调用该方法时开始,并在后台继续,其结果可

  • 我有一个应用程序,在其中按下开始按钮后,服务将开始轮询几个传感器,每当传感器值发生变化时,将传感器数据存储到某个对象中。每10毫秒,就会发生一次数据库插入,获取对象的当前值并将其存储到数据库中。这会发生30分钟 考虑到插入的速度和持续时间,我想在一个独立于UI线程的线程中运行它,这样导航就不会受到影响。因此,我的服务将通过将数据添加到队列中来为线程提供一些数据,然后另一个线程(消费者)将从队列中取

  • 问题内容: 我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分: 到目前为止,一切都很好。在阻塞队列的javadoc中,我读到: BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的

  • 2)Java的内置使用了两个锁:takeLock和putLock,并分别用在put()和take()中,我看到间隔队列是一个链表,不是线程安全的,那怎么行呢?

  • 问题内容: 给定此选择器: 它将匹配一个正文,该正文的类包含 page-node-add- 的子字符串,而类恰好是 page-node-edit 我想说匹配第一个或第二个(但不能同时匹配)。可能吗? 使用逗号的问题: 如果我有一个长选择器,例如: 我原本以为CSS3可以解决这个问题,但是我想到的是: 谢谢 问题答案: 您需要使用逗号将它们分开: 使用逗号的问题: …是除了逗号以外,您无法做其他任何

  • 默认情况下,通道发送和接收数据是阻塞的。然而我们可以使用select的一个default的选项来实现无阻塞发送或接收数据,甚至可以将多个select的case选项和default选项结合起来使用。 package main import "fmt" func main() { messages := make(chan string) signals := make(chan bo