当前位置: 首页 > 知识库问答 >
问题:

java.nio选择器和SocketChannel用于继续流式传输

方航
2023-03-14

我目前正在使用java。尼奥。频道选择器

问题-调用选择器的选择方法只返回一个值

问:我需要在每次读/写后调用SocketChannel上的关闭吗(我希望不是!)?如果没有,SocketChannel对任何读/写操作都不可用的原因是什么?

很抱歉,我不能发布代码,但我希望我已经清楚地解释了这个问题,有人可以提供帮助。我已经搜索了答案,我发现你不能在SocketChannel连接关闭后重新使用它,但我的通道不应该关闭,服务器从未收到EOF流结果。

我取得了一些进展,并弄清楚由于json解析错误,写入操作没有在服务器应用程序上发生。所以现在我在客户端应用程序代码上的SocketChannel在处理读取操作后为另一个写入操作做好准备。我想这是SocketChannel的TCP性质。但是,SocketChannel在服务器应用程序端的另一个读取操作中不可用。这是SocketChannel的正常行为吗?在读取操作后,我需要关闭客户端的连接并建立新的连接吗?

下面是我正在尝试做的代码示例:

package org.stream.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang3.RandomStringUtils;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.google.gson.stream.JsonToken;

public class ClientServerTest {

    private LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>();
    private ExecutorService executor = Executors.newFixedThreadPool(1);
    private HashMap<String, Integer> uuidToSize = new HashMap<String, Integer>();

    private class StreamWriteTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamWriteTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int results = 0;
            while (buffer.hasRemaining()) {
                try {
                    results = sc.write(buffer);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(data);
                    selector.wakeup();
                    return;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
            selector.wakeup();
        }

    }

    private class StreamReadTask implements Runnable {
        private ByteBuffer buffer;
        private SelectionKey key;
        private Selector selector;

        private StreamReadTask(ByteBuffer buffer, SelectionKey key, Selector selector) {
            this.buffer = buffer;
            this.key = key;
            this.selector = selector;
        }

        private boolean checkUUID(byte[] data) {
            return uuidToSize.containsKey(new String(data));
        }

        @Override
        public void run() {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            byte[] data = (byte[]) key.attachment();
            if (data != null) {
                buffer.put(data);
            }
            int count = 0;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                data = new byte[buffer.limit()];
                buffer.get(data);
                if (checkUUID(data)) {
                    key.interestOps(SelectionKey.OP_READ);
                    key.attach(data);
                } else {
                    System.out.println("Clinet Read - uuid ~~~~ " + new String(data));
                    key.interestOps(SelectionKey.OP_WRITE);
                    key.attach(null);
                }
            }

            if (count == -1) {
                try {
                    sc.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            selector.wakeup();
        }

    }

    private class ClientWorker implements Runnable {

        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                SocketChannel sc = SocketChannel.open();
                sc.configureBlocking(false);
                sc.connect(new InetSocketAddress("127.0.0.1", 9001));
                sc.register(selector, SelectionKey.OP_CONNECT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isConnectable()) {
                            sc = (SocketChannel) key.channel();
                            if (!sc.finishConnect()) {
                                continue;
                            }
                            sc.register(selector, SelectionKey.OP_WRITE);
                        }

                        if (key.isReadable()) {
                            key.interestOps(0);
                            executor.execute(new StreamReadTask(buffer, key, selector));
                        }
                        if (key.isWritable()) {
                            key.interestOps(0);
                            if(key.attachment() == null){
                                key.attach(dataQueue.take());
                            }
                            executor.execute(new StreamWriteTask(buffer, key, selector));
                        }
                    }
                }
            } catch (IOException ex) {
                // Handle Exception
            }catch(InterruptedException ex){

            }

        }
    }

    private class ServerWorker implements Runnable {
        @Override
        public void run() {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ServerSocket socket = ssc.socket();
                socket.bind(new InetSocketAddress(9001));
                ssc.configureBlocking(false);
                ssc.register(selector, SelectionKey.OP_ACCEPT);
                ByteBuffer buffer = ByteBuffer.allocateDirect(65535);
                DataHandler handler = new DataHandler();

                while (selector.isOpen()) {
                    int count = selector.select(10);

                    if (count == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                    while (it.hasNext()) {
                        final SelectionKey key = it.next();
                        it.remove();
                        if (!key.isValid()) {
                            continue;
                        }

                        if (key.isAcceptable()) {
                            ssc = (ServerSocketChannel) key.channel();
                            SocketChannel sc = ssc.accept();
                            sc.configureBlocking(false);
                            sc.register(selector, SelectionKey.OP_READ);
                        }
                        if (key.isReadable()) {
                            handler.readSocket(buffer, key);
                        }
                        if (key.isWritable()) {
                            handler.writeToSocket(buffer, key);
                        }
                    }
                }

            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    private class DataHandler {

        private JsonObject parseData(StringBuilder builder) {
            if (!builder.toString().endsWith("}")) {
                return null;
            }

            JsonParser parser = new JsonParser();
            JsonObject obj = (JsonObject) parser.parse(builder.toString());
            return obj;
        }

        private void readSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            buffer.clear();
            int count = Integer.MAX_VALUE;
            int readAttempts = 0;
            try {
                while ((count = sc.read(buffer)) > 0) {
                    readAttempts++;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

            if (count == 0) {
                buffer.flip();
                StringBuilder builder = key.attachment() instanceof StringBuilder ? (StringBuilder) key
                        .attachment() : new StringBuilder();
                Charset charset = Charset.forName("UTF-8");
                CharsetDecoder decoder = charset.newDecoder();
                decoder.onMalformedInput(CodingErrorAction.IGNORE);
                System.out.println(buffer);
                CharBuffer charBuffer = decoder.decode(buffer);
                String content = charBuffer.toString();
                charBuffer = null;
                builder.append(content);    
                System.out.println(content);
                JsonObject obj = parseData(builder);
                if (obj == null) {
                    key.attach(builder);
                    key.interestOps(SelectionKey.OP_READ);
                } else {
                    System.out.println("data ~~~~~~~ " + builder.toString());
                    JsonPrimitive uuid = obj.get("uuid").getAsJsonPrimitive();
                    key.attach(uuid.toString().getBytes());
                    key.interestOps(SelectionKey.OP_WRITE);
                }
            }

            if (count == -1) {
                key.attach(null);
                sc.close();
            }
        }

        private void writeToSocket(ByteBuffer buffer, SelectionKey key)
                throws IOException {
            SocketChannel sc = (SocketChannel) key.channel();
            byte[] data = (byte[]) key.attachment();
            buffer.clear();
            buffer.put(data);
            buffer.flip();
            int writeAttempts = 0;
            while (buffer.hasRemaining()) {
                int results = sc.write(buffer);
                writeAttempts++;
                System.out.println("Write Attempt #" + writeAttempts);
                if (results == 0) {
                    buffer.compact();
                    buffer.flip();
                    data = new byte[buffer.remaining()];
                    buffer.get(data);
                    key.attach(data);
                    key.interestOps(SelectionKey.OP_WRITE);
                    break;
                }
            }

            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
        }
    }

    public ClientServerTest() {
        for (int index = 0; index < 1000; index++) {
            JsonObject obj = new JsonObject();
            String uuid = UUID.randomUUID().toString();
            uuidToSize.put(uuid, uuid.length());
            obj.addProperty("uuid", uuid);
            String data = RandomStringUtils.randomAlphanumeric(10000);
            obj.addProperty("event", data);
            dataQueue.add(obj.toString().getBytes());
        }

        Thread serverWorker = new Thread(new ServerWorker());
        serverWorker.start();

        Thread clientWorker = new Thread(new ClientWorker());
        clientWorker.start();

    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ClientServerTest test = new ClientServerTest();
        for(;;){

        }
    }

}

共有1个答案

锺功
2023-03-14

>

  • 处理OP_CONNECT的正确方法是尝试finishConnect()一次,如果成功取消注册OP_CONNECT并注册OP_READOP_WRITE,可能是后者,因为您是客户端。在非阻塞模式下循环和Hibernate没有意义。如果finishConnect()返回false,OP_CONNECT将再次触发。

    你对的处理!钥匙isAcceptable()!钥匙isReadable(),和!钥匙isWriteable()完全没有任何意义。如果密钥可接受,请调用accept()。如果可读,请调用read()。如果它是可写的,请调用write()。就这么简单。

    您需要注意通道几乎总是可写的,除了它们的套接字发送缓冲区已满的短暂时期。所以只有在您有东西要写的时候才注册OP_WRITE,或者在您尝试写入并获得零返回之后更好;然后当OP_WRITE触发时,重试写入并取消注册OP_WRITE,除非您又获得了一个零。

    你用你的字节缓冲太省钱了。实际上,每个频道都需要一个。您可以将其保存为密钥附件,以便在需要时取回。否则,您将无法累积部分读取(这肯定会发生),也无法重试写入。

  •  类似资料:
    • 我有一个通用项目流。我想打印第一个项目的类名,即所有项目的。 如果我有一个Iterable,它应该是这样的: 我可以在流上执行此操作吗(

    • 我有一个列表,我想用流过滤,抓取前10个,并显示给用户。按下按钮后..我想从那个位置继续流。 但问题是,这里我不得不在第一个10+多个不匹配的过滤器上运行过滤器(再次)。

    • 选择要传输的对象 在默认情况下,“数据库对象”列表内的所有数据库对象均未选择。请勾选你要传输的数据库对象。 运行期间的全部<对象> (*) 所有的数据库对象将传输到目标数据库和/或模式,所有新添加的数据库对象也将会被传输而不必修改数据传输配置文件。 自定义 只传输已勾选的数据库对象。然而,如果在创建数据传输配置文件后,在源数据库和/或模式中添加任何新的数据库对象,新建的数据库对象将不会被传输,除非

    • 选择要传输的对象 在默认情况下,“数据库对象”列表内的所有数据库对象均未选择。请勾选你要传输的数据库对象。 运行期间的全部<对象> (*) 所有的数据库对象将传输到目标数据库和/或模式,所有新添加的数据库对象也将会被传输而不必修改数据传输配置文件。 自定义 只传输已勾选的数据库对象。然而,如果在创建数据传输配置文件后,在源数据库和/或模式中添加任何新的数据库对象,新建的数据库对象将不会被传输,除非

    • 选择要传输的对象 在默认情况下,“数据库对象”列表内的所有数据库对象均已选择。如果您不想传输某些数据库对象,请取消勾选它们。 运行期间的全部<对象> (*) 所有的数据库对象将传输到目标数据库和/或模式,所有新添加的数据库对象也将会被传输而不必修改数据传输配置文件。 自定义 只传输已勾选的数据库对象。然而,如果在创建数据传输配置文件后,在源数据库和/或模式中添加任何新的数据库对象,新建的数据库对象

    • 使用sass的时候,最后一个减少重复的主要特性就是选择器继承。基于Nicole Sullivan面向对象的css的理念,选择器继承是说一个选择器可以继承为另一个选择器定义的所有样式。这个通过@extend语法实现,如下代码: //通过选择器继承继承样式 .error { border: 1px solid red; background-color: #fdd; } .serio