akka是一种基于Scala的网络编程库,实现了RPC框架。spark在1.6之前使用的是akka进行通信,1.6及以后是基于netty。现阶段的Flink是基于Akka+Netty。如果需要了解Spark或者Flink的网络通信,就必须得从akka开始。
主类是在akka.actor.Actor,Endpoint 有三个重要的声明周期方法:
object SenderActorObject extends Actor {
println("1231231")
// 当Actor初次被调用化时
override def preStart(): Unit = {
println("abcdef")
println("执行SenderActorObject PreStart()方法")
}
override def receive: Receive = {
case "start" =>
val receiveActor = this.context.actorSelection("/user/receiverActor")
// 向第二个actor发送消息
receiveActor ! SubmitTaskMessage("请完成#001任务!")
case SuccessSubmitTaskMessage(msg) =>
println("21312312")
println(s"接收到来自${sender.path}的消息: $msg")
}
}
object ReceiverActor extends Actor {
override def preStart(): Unit = {
print(1231)
println("执行ReceiverActor()方法")
}
// 执行receive方法前会先执行preStart方法
override def receive: Receive = {
case SubmitTaskMessage(msg) =>
println(s"接收到来自${sender.path}的消息: $msg")
// 又向第一个sender发送消息
sender ! SuccessSubmitTaskMessage("完成提交")
case _ => println("未匹配的消息类型")
}
}
object SimpleAkkaDemo {
def main(args: Array[String]): Unit = {
// 创建一个actor系统
val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
//创建一个actor
val senderActor: ActorRef = actorSystem.actorOf(Props(SenderActorObject), "senderActor")
//创建一个actor
val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
println("abcdefghij")
receiverActor ! "asda"
println("afafa")
// 使用actor的引用向actor发送消息
senderActor ! "start"
}
}
说明:
- ActorSystem 是管理 Actor生命周期的组件, Actor是负责进行通信的组件
- 每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
- 每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。
- Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor
- 每一个ActorSystem 和 Actor都在启动的时候会给定一个 name,如果要从ActorSystem中,获取一个 Actor,则通过以下的方式来进行 Actor的获取:akka.tcp://asname@bigdata02:9527/user/actorname
- 如果一个 Actor 要和另外一个 Actor进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
- 通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到返回处理结果。
注意以下几点:
//与master建立连接
selection = context.actorSelection("akka.tcp://MasterActorSystem@localhost:1111/user/MasterActor")
//向master注册信息
selection ! Conf(workerId, 2040, 4)
也可以使用以下方式给发送者返回消息:
sender() ! Confed
/**
* initialDelay: FiniteDuration, 多久以后开始执行
* interval: FiniteDuration, 每隔多长时间执行一次
* receiver: ActorRef, 给谁发送这个消息
* message: Any 发送的消息是啥
*
* 每隔4秒钟给自己发送SendMessage
*/
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(0 millis, 4000 millis, self, SendMessage)