在Akka中有没有什么方法可以像在Erlang中一样用{packet,4}来实现包帧?数据包如下所示:
4 bytes length in big endian | body...
00 00 00 05 H E L L O 0 0 0 5 W O R L D
00 00 00 05 H E L L
import java.net.InetSocketAddress
import akka.actor.{Props, Actor}
import akka.io.Tcp.Bind
import akka.io.{IO, Tcp}
class Server extends Actor{
import context.system
import Tcp._
IO(Tcp) ! Bind(self, new InetSocketAddress("0.0.0.0", 1234))
def receive ={
case bound @ Bound(localAddr) =>
println("Server is bound to "+localAddr.toString())
case failed @ CommandFailed(_ : Bind) =>
context stop self
case connected @ Connected(remote, local) =>
val handler = context.actorOf(Props[ClientHandler])
val connection = sender()
println(remote.toString + "connected to "+local.toString())
connection ! Register(handler)
}
}
据我所知,在Akka或Scala中没有用于此的库函数。Akka在字节字符串中进行读写交易,所以我把一个特性放在一起,它将完全满足您的要求。您将发送给您的演员的字节字符串传递给它。然后,它根据报头中的数据包长度分解流。它是无状态的,因此它返回一个元组,其中包含提取的数据包列表和TCP流中任何未使用的数据。您可以将新的TCP数据连接到以这个字节字符串返回的流的未使用部分,如下例所示。
trait Buffering {
val MAX_PACKET_LEN: Short = 10000
/**
* Extracts complete packets of the specified length, preserving remainder
* data. If there is no complete packet, then we return an empty list. If
* there are multiple packets available, all packets are extracted, Any remaining data
* is returned to the caller for later submission
* @param data A list of the packets extracted from the raw data in order of receipt
* @return A list of ByteStrings containing extracted packets as well as any remaining buffer data not consumed
*/
def getPacket(data: ByteString): (List[ByteString], ByteString) = {
val headerSize = 2
@tailrec
def multiPacket(packets: List[ByteString], current: ByteString): (List[ByteString], ByteString) = {
if (current.length < headerSize) {
(packets.reverse, current)
} else {
val len = current.iterator.getShort
if (len > MAX_PACKET_LEN || len < 0) throw new RuntimeException(s"Invalid packet length: $len")
if (current.length < len + headerSize) {
(packets.reverse, current)
} else {
val rem = current drop headerSize // Pop off header
val (front, back) = rem.splitAt(len) // Front contains a completed packet, back contains the remaining data
// Pull of the packet and recurse to see if there is another packet available
multiPacket(front :: packets, back)
}
}
}
multiPacket(List[ByteString](), data)
}
actor的用法如下:
def receive = buffer(CompactByteString())
def buffer(buf: ByteString): Receive = {
// Messages inbound from the network
case Received(data) =>
val (pkt, remainder) = getPacket(buf ++ data)
// Do something with your packet
context become buffer(remainder)
case Other Stuff => // Etc
}
我试图使用Akka和Scala编写一个TCP服务器,它将实例化参与者,并在客户端分别连接和断开连接时停止参与者。我有一个TCP绑定执行器, 上面实例化上的TCP侦听器,并将处理程序参与者注册到每个连接。 我没有在附近配置的非Windows机器上进行测试,因为我认为这与我在Windows上运行有关,因为在搜索之后,我发现了一个仍然打开的bug--https://github.com/akka/akk
> 构造函数接受InetSocketAddress和actorref。InetSocketAddress是有意义的(我假设这是目的地),但是actorref是什么?这是我第一次使用akka,但据我了解,ActorRef是另一个演员的引用。既然我的TCP客户端是一个参与者,并且我希望这个TCP参与者与TCP服务器通信,而不是与另一个参与者通信,为什么我要给它一个参与者引用? 伙伴对象中的道具功能是用
(更新) 看来我的问题表达得不够清楚。我的问题不是如何解析数据,如果它被分割成多个段。我的协议定义得很好(即START_COOKIE、LENGTH、DATA、CRC),并且我一收到数据就将其排入字节FIFO(上面代码段中的调用),因此我可以轻松地对其进行异步解析。 问题是,如果我看到的是第号包。1(100字节)(在Wireshark中+50ms)和数据包编号。2(100字节)在Wireshark中
我有一个使用spark的项目,我想在其中使用Akka。该项目以前工作得很好,但当我将此添加到中时: 并尝试运行该项目,我得到以下错误: [错误](run-main-0)org.apache.spark.sparkException:作业由于阶段失败而中止:任务序列化失败:java.lang.ClassNotFoundException:Scala.Function0 [错误]java.lang.C
问题内容: 为了了解TCP的工作原理,我尝试建立自己的TCP SYN / SYN-ACK / ACK(基于该教程:http://www.thice.nl/creating-ack-get-packets-with- scapy/ )。 问题是,每当我的计算机从服务器收到SYN-ACK时,它都会生成一个RST数据包,该数据包将停止连接过程。 我在OS X Lion和Ubuntu 10.10 Mave
可扩展的实时事务处理,我们相信编写出正确的、具有容错性和可扩展性的并发程序太困难了。这多数是因为使用了错误的工具和错误的抽象级别。