当前位置: 首页 > 工具软件 > Actor IM > 使用案例 >

scala中的Akka、Actor

陆宝
2023-12-01

一、Actor模型
对并发编程模型进行了更高的抽象
异步、非阻塞、高并发的事件驱动模型
轻量级事件处理(1G可容纳百万级别个Actor)
Actor可以理解为服务中的各个组件,他可以用来接收信息,处理信息,回复信息。actor之间是通过ActorRef来做信息的收发,每个actor会有个邮箱,actorref会往邮箱发送信息

Actor System负责创建和管理Actor是单例的,而Actor是多例的。
Actor 处理邮件中的信息
ActorRef 可以理解为一个引用,通过它来传递信息

二、简单实例(模拟客户端和服务端进行信息回复)

服务端的actor

package actor

import akka.actor.{Actor, ActorSystem, Props}
import akka.remote.serialization.MessageContainerSerializer
import com.typesafe.config.{Config, ConfigFactory}

class MasterActor extends Actor {
  override def receive: Receive = {
    case "play" =>{
      println("receive message play")
      // 发送信息  ! 表示异步无返回值
    }
    case a:ClientMessage =>{
      println("receive message "+a.toString)
      sender() ! new ServerMessage("put num :"+Math.random().toString)
    }
  }

客户端actor

package actor

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.sun.xml.internal.ws.resources.ClientMessages
import com.typesafe.config.{Config, ConfigFactory}

class ClientActor extends Actor{
  var master:ActorSelection= null

  override def preStart(): Unit = {
    val hostname="localhost"
    val serveractorsystem="SystemMaster"
    val serveractor="MyActorMaster"
    val port="8888"
    //在创建worker actor之前向master发送一个消息
    master=context.actorSelection(s"akka.tcp://${serveractorsystem}@${hostname}:${port}/user/${serveractor}")

    val message="play"
    //获得master相关对象,向master发送信息
    master ! message
  }
  override def receive: Receive = {
    case "play basketball" =>{
      println("i get message from master")
    }
    case msg:String=>{
      master ! new ClientMessage(msg)
    }
    case msg:ServerMessage=>{
      println("收到信息:==="+msg.toString)
    }
  }
}

服务端启动类:

package actor

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

object Test {
  def main(args: Array[String]): Unit = {
    println("==========")
    val strConfig =
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = localhost
        |akka.remote.netty.tcp.port = 8888
      """.stripMargin
    val config: Config = ConfigFactory.parseString(strConfig)
    val myMaster: ActorSystem = ActorSystem("SystemMaster", config)
    myMaster.actorOf(Props(new MasterActor()), "MyActorMaster")
  }
}

客户端启动类:

package actor

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

object TestClient {
  def main(args: Array[String]): Unit = {
    println("==========")
    //1.构建一个:ActorSystem
    val strConfig:String=
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = localhost
      """.stripMargin
    val config: Config = ConfigFactory.parseString(strConfig)
    val workerActorSystem: ActorSystem = ActorSystem("workerActorSystem",config)
    val actorRef = workerActorSystem.actorOf(Props(new ClientActor()),"workerActor")
    actorRef ! "hello im client"
  }
}

服务端日志:

==========
[INFO] [01/18/2020 16:25:57.223] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/18/2020 16:25:57.947] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SystemMaster@localhost:8888]
[INFO] [01/18/2020 16:25:57.949] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://SystemMaster@localhost:8888]
receive message play
receive message hello im client
[WARN] [SECURITY][01/18/2020 16:26:07.678] [SystemMaster-akka.remote.default-remote-dispatcher-6] [akka.serialization.Serialization(akka://SystemMaster)] Using the default Java serializer for class [actor.ServerMessage] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'

客户端日志:

==========
[INFO] [01/18/2020 16:26:06.688] [main] [akka.remote.Remoting] Starting remoting
[INFO] [01/18/2020 16:26:07.404] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://workerActorSystem@localhost:2552]
[INFO] [01/18/2020 16:26:07.406] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://workerActorSystem@localhost:2552]
[WARN] [SECURITY][01/18/2020 16:26:07.637] [workerActorSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://workerActorSystem)] Using the default Java serializer for class [actor.ClientMessage] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
收到信息:===put num :0.7189882290052645

总结:
1、初始化 ActorSystem
2、由ActorSystem 创建 Actor 返回引用对象 ActorRef (这个对象可以调用其Actor中的方法)
3、客户端 context.actorSelection(s"akka.tcp:// s e r v e r a c t o r s y s t e m @ {serveractorsystem}@ serveractorsystem@{hostname}: p o r t / u s e r / {port}/user/ port/user/{serveractor}")
获取到master的actor发送对象,通过他进行发送信息到master

 类似资料: