当前位置: 首页 > 知识库问答 >
问题:

使用Akka的Scala-TCP数据包帧

武骁
2023-03-14

在Akka中有没有什么方法可以像在Erlang中一样用{packet,4}来实现包帧?数据包如下所示:

4 bytes length in big endian | body...
00 00 00 05 H E L L O 0 0 0 5 W O R L D
00 00 00 05 H E L L
import java.net.InetSocketAddress

import akka.actor.{Props, Actor}
import akka.io.Tcp.Bind
import akka.io.{IO, Tcp}

class Server extends Actor{
    import context.system
    import Tcp._
    IO(Tcp) ! Bind(self, new InetSocketAddress("0.0.0.0", 1234))

    def receive ={
        case bound @ Bound(localAddr) =>
            println("Server is bound to "+localAddr.toString())
        case failed @ CommandFailed(_ : Bind) =>
            context stop self
        case connected @ Connected(remote, local) =>
            val handler = context.actorOf(Props[ClientHandler])
            val connection = sender()
            println(remote.toString + "connected to "+local.toString())

            connection ! Register(handler)
    }
}

共有1个答案

宋晋
2023-03-14

据我所知,在Akka或Scala中没有用于此的库函数。Akka在字节字符串中进行读写交易,所以我把一个特性放在一起,它将完全满足您的要求。您将发送给您的演员的字节字符串传递给它。然后,它根据报头中的数据包长度分解流。它是无状态的,因此它返回一个元组,其中包含提取的数据包列表和TCP流中任何未使用的数据。您可以将新的TCP数据连接到以这个字节字符串返回的流的未使用部分,如下例所示。

trait Buffering {

  val MAX_PACKET_LEN: Short = 10000

  /**
   * Extracts complete packets of the specified length, preserving remainder
   * data. If there is no complete packet, then we return an empty list. If
   * there are multiple packets available, all packets are extracted, Any remaining data
   * is returned to the caller for later submission
   * @param data A list of the packets extracted from the raw data in order of receipt
   * @return A list of ByteStrings containing extracted packets as well as any remaining buffer data not consumed
   */
  def getPacket(data: ByteString): (List[ByteString], ByteString) = {

    val headerSize = 2

    @tailrec
    def multiPacket(packets: List[ByteString], current: ByteString): (List[ByteString], ByteString) = {
      if (current.length < headerSize) {
        (packets.reverse, current)
      } else {
        val len = current.iterator.getShort
        if (len > MAX_PACKET_LEN || len < 0) throw new RuntimeException(s"Invalid packet length: $len")
        if (current.length < len + headerSize) {
          (packets.reverse, current)
        } else {
          val rem = current drop headerSize // Pop off header
          val (front, back) = rem.splitAt(len) // Front contains a completed packet, back contains the remaining data
          // Pull of the packet and recurse to see if there is another packet available
          multiPacket(front :: packets, back)
        }
      }
    }
    multiPacket(List[ByteString](), data)
  }

actor的用法如下:

def receive = buffer(CompactByteString())

def buffer(buf: ByteString): Receive = {
  // Messages inbound from the network
  case Received(data) =>
    val (pkt, remainder) = getPacket(buf ++ data)
    // Do something with your packet
    context become buffer(remainder) 
  case Other Stuff => // Etc
}
 类似资料:
  • 我试图使用Akka和Scala编写一个TCP服务器,它将实例化参与者,并在客户端分别连接和断开连接时停止参与者。我有一个TCP绑定执行器, 上面实例化上的TCP侦听器,并将处理程序参与者注册到每个连接。 我没有在附近配置的非Windows机器上进行测试,因为我认为这与我在Windows上运行有关,因为在搜索之后,我发现了一个仍然打开的bug--https://github.com/akka/akk

  • > 构造函数接受InetSocketAddress和actorref。InetSocketAddress是有意义的(我假设这是目的地),但是actorref是什么?这是我第一次使用akka,但据我了解,ActorRef是另一个演员的引用。既然我的TCP客户端是一个参与者,并且我希望这个TCP参与者与TCP服务器通信,而不是与另一个参与者通信,为什么我要给它一个参与者引用? 伙伴对象中的道具功能是用

  • (更新) 看来我的问题表达得不够清楚。我的问题不是如何解析数据,如果它被分割成多个段。我的协议定义得很好(即START_COOKIE、LENGTH、DATA、CRC),并且我一收到数据就将其排入字节FIFO(上面代码段中的调用),因此我可以轻松地对其进行异步解析。 问题是,如果我看到的是第号包。1(100字节)(在Wireshark中+50ms)和数据包编号。2(100字节)在Wireshark中

  • 我有一个使用spark的项目,我想在其中使用Akka。该项目以前工作得很好,但当我将此添加到中时: 并尝试运行该项目,我得到以下错误: [错误](run-main-0)org.apache.spark.sparkException:作业由于阶段失败而中止:任务序列化失败:java.lang.ClassNotFoundException:Scala.Function0 [错误]java.lang.C

  • 问题内容: 为了了解TCP的工作原理,我尝试建立自己的TCP SYN / SYN-ACK / ACK(基于该教程:http://www.thice.nl/creating-ack-get-packets-with- scapy/ )。 问题是,每当我的计算机从服务器收到SYN-ACK时,它都会生成一个RST数据包,该数据包将停止连接过程。 我在OS X Lion和Ubuntu 10.10 Mave

  • 可扩展的实时事务处理,我们相信编写出正确的、具有容错性和可扩展性的并发程序太困难了。这多数是因为使用了错误的工具和错误的抽象级别。