当前位置: 首页 > 知识库问答 >
问题:

Vertx NetServer控件读取流

雷逸仙
2023-03-14

我正试图模拟TCP服务器,以便在必须使用的现有基础设施的基础上使用Vertx进行测试。

我模拟的服务器工作完全异步,并且根据缓冲区中指示请求长度的前置标头知道传入缓冲区的长度。

我需要读取连接到模拟TCP服务器的每个客户端套接字上传入请求的前6个字符。从这个预标题中,我读取了请求的实际长度(例如,对于xx3018,我知道请求的完整长度是3018)。

使用普通java的工作模拟服务器示例(快速实现,不会阻止其他开发:))

public void run(String... args) throws Exception {
    log.info("Starting TCP Server");

    ServerSocket serverSocket = new ServerSocket(1750);

    while (true) {
        try {
            Socket socket = serverSocket.accept();

            CompletableFuture.runAsync(() -> {
                Exception e = null;
                while (e == null) {
                    try {
                        InputStream inputStream = socket.getInputStream();
                        OutputStream outputStream = socket.getOutputStream();

                        byte[] preHeader = new byte[6];
                        inputStream.read(preHeader);

                        String preHeaderValue = new String(preHeader);
                        log.info("Pre header: {}", preHeaderValue);

                        int length = Integer.valueOf(preHeaderValue.substring(2));
                        log.info("Request full length: {}", length);
                        byte[] request = new byte[length - 6];

                        inputStream.read(request);

                        String requestValue = new String(request);

                        log.info("Request: {}", requestValue);

                        String response = this.requestResponseProvider.getResponse(preHeaderValue + requestValue);
                        log.info("Response: {}", response);
                        outputStream.write(response.getBytes());
                    } catch (Exception ex) {
                        log.error("Encountered a problem: {}", e.getMessage());
                        e = ex;
                    }
                }
            });
        } catch (Exception e) {
            log.error("Encountered a problem: {}", e.getMessage());
        }
    }
}

我似乎找不到一种方法来控制输入流,就像我用普通java控制它一样。

共有1个答案

太叔鹏云
2023-03-14

在很长一段时间没有考虑这个问题之后,我决定稍微考虑一下。

我记得在另一个项目中使用了以下模块:https://github.com/vert-x3/vertx-tcp-eventbus-bridge

我还记得在tcp桥的内部协议中,它将有效负载的长度附加到通过tcp桥发送的缓冲区中,我查看了源代码以了解它如何处理块(又名帧)

我发现:https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java这正是我想要实现的:)

我稍微修改了一下,转换为静态编程语言,这样我就可以控制标题大小和提取有效负载长度的方式。

下面是一个使用Vert控制读取流的粗略、快速和肮脏的示例。x NetServer:

suspend fun main() {
  val vertx = Vertx.vertx()
  initServer(vertx)
  initClient(vertx)
}

suspend fun initServer(vertx: Vertx) {
  val server = vertx.createNetServer(netServerOptionsOf(port = 8888, host = "localhost"))

  server
    .connectHandler { socket ->
      val parser = FrameParser(
        headerSize = 4,
        headerHandler = {
          it.getInt(0)
        },
        handler = {
          println(it.toString())
          println("---")
        }
      )
      socket.handler(parser)

      socket.exceptionHandler {
        it.printStackTrace()
        socket.close()
      }
    }
    .listenAwait()
}

suspend fun initClient(vertx: Vertx) {
  val client = vertx.createNetClient()
  val socket = client.connectAwait(port = 8888, host = "localhost")

  val message = "START|${"foobarfoobar".repeat(100)}|END"
  val length = message.length
  repeat(5) {
    repeat(100) {
      vertx.setPeriodic(10) {
        socket.write(
          Buffer.buffer()
            .appendInt(length)
            .appendString(message)
        )
      }
    }
    delay(1000)
  }
}

/**
 * Based on: https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java
 */
class FrameParser(
  private val headerSize: Int,
  private val headerHandler: (Buffer) -> Int,
  private val handler: (Buffer) -> Unit
) : Handler<Buffer?> {

  private var _buffer: Buffer? = null
  private var _offset = 0

  override fun handle(buffer: Buffer?) {
    append(buffer)
    var offset: Int
    while (true) {
      // set a rewind point. if a failure occurs,
      // wait for the next handle()/append() and try again
      offset = _offset

      // how many bytes are in the buffer
      val remainingBytes = bytesRemaining()

      // at least expected header size
      if (remainingBytes < headerSize) {
        break
      }

      // what is the length of the message
      val length: Int = headerHandler(_buffer!!.getBuffer(_offset, _offset + headerSize))
      _offset += headerSize
      if (remainingBytes - headerSize >= length) {
        // we have a complete message
        handler(_buffer!!.getBuffer(_offset, _offset + length))
        _offset += length
      } else {
        // not enough data: rewind, and wait
        // for the next packet to appear
        _offset = offset
        break
      }
    }
  }

  private fun append(newBuffer: Buffer?) {
    if (newBuffer == null) {
      return
    }

    // first run
    if (_buffer == null) {
      _buffer = newBuffer
      return
    }

    // out of data
    if (_offset >= _buffer!!.length()) {
      _buffer = newBuffer
      _offset = 0
      return
    }

    // very large packet
    if (_offset > 0) {
      _buffer = _buffer!!.getBuffer(_offset, _buffer!!.length())
    }
    _buffer!!.appendBuffer(newBuffer)
    _offset = 0
  }

  private fun bytesRemaining(): Int {
    return if (_buffer!!.length() - _offset < 0) {
      0
    } else {
      _buffer!!.length() - _offset
    }
  }
}
 类似资料:
  • 本文向大家介绍C#使用Aspose.Cells控件读取Excel,包括了C#使用Aspose.Cells控件读取Excel的使用技巧和注意事项,需要的朋友参考一下 Aspose是一个很强大的控件,可以用来操作word,excel,ppt等文件,用这个控件来导入、导出数据非常方便。其中Aspose.Cells就是用来操作Excel的,功能有很多。我所用的是最基本的功能,读取Excel的数据并导入到D

  • 我试图通过使用命令“java postfix 错误: 请在java.util.scanner.throwfor(未知源) 在java.util.scanner.next(未知源) 在java.util.scanner.nextInt(未知源) 在java.util.scanner.nextInt(未知源) 在java.util.scanner.nextInt(未知源) 在java.util.sca

  • 工作表列表 全量读取 游标读取 跳过指定行 忽略空白单元格 忽略空白行 忽略跳过动作常量 设置全局读取类型 单元格回调模式读取 数据类型读取 数据类型常量

  • 读取文件已支持 windows 系统,版本号大于等于 1.3.4.1; 扩展版本大于等于 1.2.7; PECL 安装时将会提示是否开启读取功能,请键入 yes; 编译 编译时需添加 --enable-reader ./configure --enable-reader 示例 $config = ['path' => './tests']; $excel = new \Vtiful\K

  • 我刚刚开始学习Groovy,正在GroovyConsole中进行实验。 有没有办法读取用户输入?我尝试了下面的代码,但我得到了一个错误。 这是我收到的错误: 有什么我需要进口的吗? 任何帮助都会很好。 谢谢

  • 读写文件是很多程序的基本任务,下面我们看看Go里面的文件读取。 package main import ( "bufio" "fmt" "io" "io/ioutil" "os" ) // 读取文件的函数调用大多数都需要检查错误, // 使用下面这个错误检查方法可以方便一点 func check(e error) { if e != nil {