Akka2.5.x升级到2.6.x

子车轶
2023-12-01

Akka2.5.x升级到2.6.x

背景

系统集群化后,需要使用2.6.7+版本的脑裂功能模块。

注意:Akka2.6已经不支持scala2.11了。

build.sbt配置

val AkkaVersion = "2.6.14"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-cluster" % AkkaVersion,
  "com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor" % AkkaVersion,
  "com.typesafe.akka" %% "akka-remote" % AkkaVersion,
  "com.typesafe.akka" %% "akka-cluster" % AkkaVersion,
  "com.typesafe.akka" %% "akka-cluster-metrics" % AkkaVersion,
  "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion,
  "com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.8",
  "io.netty" % "netty" % "3.10.6.Final",
  "com.twitter"   %% "chill-akka" % "0.9.3"
)

下文中将提到为何增加netty和chill-akka两个包。
project.sbt配置的addSbtPlugin(“com.typesafe.play” % “sbt-plugin” % “2.6.19”),更高版本遇到过问题,没记录原因。

application.conf配置

akka remote原先配置:

akka.remote {
    netty.tcp {
      hostname = ${HOST}

      port = 2552
      port = ${?PORT_2552}


      bind-hostname = 0.0.0.0
      bind-port = 2552
    }
  }

升级后,akka remote默认是artery了,如果还需要继续使用netty,就需要增加netty包,并作如下配置:

akka.remote.artery.enabled = false
akka.remote.classic {
    enabled-transports = ["akka.remote.classic.netty.tcp"]

    netty.tcp {
      hostname = ${HOST}

      port = 2552
      port = ${?PORT_2552}


      bind-hostname = 0.0.0.0
      bind-port = 2552
    }
  }

akka 2.7将弃用netty,官方建议升级,相关文档

序列化升级,默认的java序列化方式会有警告信息(性能差,而且不安全),我们替换成性能高的kryo,所以增加了chill-akka包。

akka {
  actor {
    enable-additional-serialization-bindings = on
    allow-java-serialization = off
    serializers {
      kryo = "com.twitter.chill.akka.AkkaSerializer"
    }
    serialization-bindings {
      "java.io.Serializable" = kryo
    }
  }
}

代码升级

新版本不使用老的ActorMaterialzier构建方法

implicit val materializer = ActorMaterializer(
    ActorMaterializerSettings(context.system)
    .withSupervisionStrategy(decider)
    .withDispatcher("stream-dispatcher"))

引入actorSystem就可以

implicit  val actorSystem = context.system

stream使用容错机制和线程配置可以通过如下配置:

 lazy val deciderAttribute = ActorAttributes.supervisionStrategy(decider)
  lazy val dispatcherAttribute = ActorAttributes.dispatcher("stream-dispatcher")
  
  val (future, killSwitch) = StreamFactory.buildGraph(s.copy(jobId = jobId))
            .withAttributes(deciderAttribute and dispatcherAttribute).run()

使用ActorAttributes构建配置,使用withAttributes方法设置配置。根据需要stage也是可以设置独立的attributes。

参考

https://doc.akka.io/docs/akka/current/project/migration-guide-2.5.x-2.6.x.html

 类似资料: