系统集群化后,需要使用2.6.7+版本的脑裂功能模块。
注意:Akka2.6已经不支持scala2.11了。
val AkkaVersion = "2.6.14"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster" % AkkaVersion,
"com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
"com.typesafe.akka" %% "akka-actor" % AkkaVersion,
"com.typesafe.akka" %% "akka-remote" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster-metrics" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % "1.0.8",
"io.netty" % "netty" % "3.10.6.Final",
"com.twitter" %% "chill-akka" % "0.9.3"
)
下文中将提到为何增加netty和chill-akka两个包。
project.sbt配置的addSbtPlugin(“com.typesafe.play” % “sbt-plugin” % “2.6.19”),更高版本遇到过问题,没记录原因。
akka remote原先配置:
akka.remote {
netty.tcp {
hostname = ${HOST}
port = 2552
port = ${?PORT_2552}
bind-hostname = 0.0.0.0
bind-port = 2552
}
}
升级后,akka remote默认是artery了,如果还需要继续使用netty,就需要增加netty包,并作如下配置:
akka.remote.artery.enabled = false
akka.remote.classic {
enabled-transports = ["akka.remote.classic.netty.tcp"]
netty.tcp {
hostname = ${HOST}
port = 2552
port = ${?PORT_2552}
bind-hostname = 0.0.0.0
bind-port = 2552
}
}
akka 2.7将弃用netty,官方建议升级,相关文档。
序列化升级,默认的java序列化方式会有警告信息(性能差,而且不安全),我们替换成性能高的kryo,所以增加了chill-akka包。
akka {
actor {
enable-additional-serialization-bindings = on
allow-java-serialization = off
serializers {
kryo = "com.twitter.chill.akka.AkkaSerializer"
}
serialization-bindings {
"java.io.Serializable" = kryo
}
}
}
新版本不使用老的ActorMaterialzier构建方法
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(context.system)
.withSupervisionStrategy(decider)
.withDispatcher("stream-dispatcher"))
引入actorSystem就可以
implicit val actorSystem = context.system
stream使用容错机制和线程配置可以通过如下配置:
lazy val deciderAttribute = ActorAttributes.supervisionStrategy(decider)
lazy val dispatcherAttribute = ActorAttributes.dispatcher("stream-dispatcher")
val (future, killSwitch) = StreamFactory.buildGraph(s.copy(jobId = jobId))
.withAttributes(deciderAttribute and dispatcherAttribute).run()
使用ActorAttributes构建配置,使用withAttributes方法设置配置。根据需要stage也是可以设置独立的attributes。
https://doc.akka.io/docs/akka/current/project/migration-guide-2.5.x-2.6.x.html