当前位置: 首页 > 知识库问答 >
问题:

Akka集群设置与播放框架

庞鸿骞
2023-03-14

我目前正在尝试使用自动发现服务实现集群play akka实现。然而,我似乎在游戏中包含的Guice DI加载程序上遇到了问题。他们的文件摘录如下:

https://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-和阿卡

虽然我们建议您使用内置的演员系统,因为它设置了所有内容,如正确的类加载器、生命周期钩子等,但没有什么能阻止您使用自己的演员系统。然而,重要的是要确保你做到以下几点:

注册一个停止钩子,当Play关闭时关闭演员系统,从Play环境中通过正确的类加载器,否则Akka将无法找到您的应用程序类

确保您更改Play读取的位置,即使用Play读取akka配置。阿克卡。配置,或者您不从默认akka配置读取akka配置,因为这会导致问题,例如当系统尝试绑定到相同的远程端口时

我已经做了上面的配置,他们建议,但是我似乎无法绕过播放仍然绑定它的内部ActorSystemProvider从BuiltInMoules:

class BuiltinModule extends Module {
def bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = 

    {
        def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = {
          factories.flatMap(_(env, configuration))
        }

        Seq(
          bind[Environment] to env,
          bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)),
          bind[Configuration].toProvider[ConfigurationProvider],
          bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider],

          // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it
          // to shut it down.
          bind[DefaultApplicationLifecycle].toSelf,
          bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]),

          bind[Application].to[DefaultApplication],
          bind[play.Application].to[play.DefaultApplication],

          bind[Router].toProvider[RoutesProvider],
          bind[play.routing.Router].to[JavaRouterAdapter],
          bind[ActorSystem].toProvider[ActorSystemProvider],
          bind[Materializer].toProvider[MaterializerProvider],
          bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider],
          bind[ExecutionContext].to[ExecutionContextExecutor],
          bind[Executor].to[ExecutionContextExecutor],
          bind[HttpExecutionContext].toSelf,

          bind[CryptoConfig].toProvider[CryptoConfigParser],
          bind[CookieSigner].toProvider[CookieSignerProvider],
          bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider],
          bind[AESCrypter].toProvider[AESCrypterProvider],
          bind[play.api.libs.Crypto].toSelf,
          bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator]
        ) ++ dynamicBindings(
            HttpErrorHandler.bindingsFromConfiguration,
            HttpFilters.bindingsFromConfiguration,
            HttpRequestHandler.bindingsFromConfiguration,
            ActionCreator.bindingsFromConfiguration
          )
      }
    }

我曾尝试创建自己的GuiceApplication ationBuilder来绕过它,但是,现在它只是将重复的绑定异常从BuiltInMoules中移动出来。

以下是我正在尝试的:

AkkaConfigModule:

package module.akka

import com.google.inject.{AbstractModule, Inject, Provider, Singleton}
import com.typesafe.config.Config
import module.akka.AkkaConfigModule.AkkaConfigProvider
import net.codingwell.scalaguice.ScalaModule
import play.api.Application

/**
  * Created by dmcquill on 8/15/16.
  */
object AkkaConfigModule {
    @Singleton
    class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] {
        override def get() = {
            val classLoader = application.classloader
            NodeConfigurator.loadConfig(classLoader)
        }
    }
}

/**
  * Binds the application configuration to the [[Config]] interface.
  *
  * The config is bound as an eager singleton so that errors in the config are detected
  * as early as possible.
  */
class AkkaConfigModule extends AbstractModule with ScalaModule {

    override def configure() {
        bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton()
    }

}

ActorSystem模块:

package module.akka


import actor.cluster.ClusterMonitor
import akka.actor.ActorSystem
import com.google.inject._
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
import play.api.inject.ApplicationLifecycle

import scala.collection.JavaConversions._

/**
  * Created by dmcquill on 7/27/16.
  */
object ActorSystemModule {
    @Singleton
    class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] {
        override def get() = {
            val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp"))

            // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector
            GuiceAkkaExtension(system).initialize(injector)

            system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", "))
            system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name))

            lifecycle.addStopHook { () =>
                system.terminate()
            }

            system
        }
    }
}

/**
  * A module providing an Akka ActorSystem.
  */
class ActorSystemModule extends AbstractModule with ScalaModule {
    import module.akka.ActorSystemModule.ActorSystemProvider

    override def configure() {
        bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton()
    }
}

应用程序加载器:

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        initialBuilder
            .overrides(overrides(context): _*)
            .bindings(new AkkaConfigModule, new ActorSystemModule)
    }

}

我需要完成的主要事情是配置ActorSystem,以便我可以以编程方式加载Akka集群的种子节点。

上述方法是正确的方法还是有更好的方法来实现这一点?如果这是正确的方法,那么对于play/guice的DI设置,有什么我根本不了解的地方吗?

使现代化

对于这种架构,play akka位于相同的节点上。

共有2个答案

马航
2023-03-14

轻型弯管有一个很好的示例http://www.lightbend.com/activator/template/play-akka-cluster-sample您可以下载示例并重复使用它。

韦睿
2023-03-14

最后,我试图做一些比必要的更复杂的事情。我没有执行上述流程,而是以编程方式扩展初始配置,以便以编程方式检索必要的网络信息。

最终结果基本上由几个类组成:

NodeConfigurator:此类包含用于从应用程序检索属性的相关实用程序方法。conf,然后以编程方式创建一个配置,与kubernetes发现服务结合使用。

object NodeConfigurator {

    /**
      * This method given a class loader will return the configuration object for an ActorSystem
      * in a clustered environment
      *
      * @param classLoader the configured classloader of the application
      * @return Config
      */
    def loadConfig(classLoader: ClassLoader) = {
        val config = ConfigFactory.load(classLoader)

        val clusterName = config.getString(CLUSTER_NAME_PROP)
        val seedPort = config.getString(SEED_PORT_CONF_PROP)

        val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") {
            getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS)
        } else {
            config.getString(HOST_CONF_PROP)
        }

        ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host))
            .withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host))
            .withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host))
            .withFallback(config)
            .resolve()
    }

    /**
      * Get the local ip address which defaults to localhost if not
      * found on the eth0 adapter
      *
      * @return Option[String]
      */
    def getLocalHostAddress:  Option[String] = {
        import java.net.NetworkInterface

        import scala.collection.JavaConversions._

        NetworkInterface.getNetworkInterfaces
            .find(_.getName equals "eth0")
            .flatMap { interface =>
                interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress)
            }
    }

    /**
      * Retrieves a set of seed nodes that are currently running in our cluster
      *
      * @param config akka configuration object
      * @return Array[String]
      */
    def getSeedNodes(config: Config) = {
        if(config.hasPath(SEED_NODES_CONF_PROP)) {
            config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim)
        } else {
            Array.empty[String]
        }
    }

    /**
      * formats the seed node addresses in the proper format
      *
      * @param clusterName name of akka cluster
      * @param seedNodeAddresses listing of current seed nodes
      * @param seedNodePort configured seed node port
      * @param defaultSeedNodeAddress default seed node address
      * @return
      */
    def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = {
        if(seedNodeAddresses.isEmpty) {
            s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://$clusterName@$defaultSeedNodeAddress:$seedNodePort" ]"""
        } else {
            seedNodeAddresses.map { address =>
                s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://$clusterName@$address:$seedNodePort""""
            }.mkString("\n")
        }
    }

    val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name"
    val HOST_CONF_PROP = "fitnessAkka.host"
    val PORT_CONF_PROP = "fitnessAkka.port"
    val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes"
    val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port"

    private val DEFAULT_HOST_ADDRESS = "127.0.0.1"
}

CustomApplicationLoader:只需使用play的可重写应用程序加载程序从NodeConfigurator获取生成的配置,然后使用它扩展initialConfiguration。

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        val classLoader = context.environment.classLoader
        val configuration = Configuration(NodeConfigurator.loadConfig(classLoader))

        initialBuilder
                .in(context.environment)
                .loadConfig(context.initialConfiguration ++ configuration)
                .overrides(overrides(context): _*)
    }

}

AkkaActorModule:提供一个依赖项可注入的actor ref,与API一起使用以显示集群成员。

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport {
    def configure = {
        bindActor[ClusterMonitor]("cluster-monitor")
    }
}

ClusterMonitor:这是一个参与者,它只监听集群事件并额外接收消息以生成当前集群状态。

class ClusterMonitor @Inject() extends Actor with ActorLogging {
    import actor.cluster.ClusterMonitor.GetClusterState

    val cluster = Cluster(context.system)
    private var nodes = Set.empty[Address]

    override def preStart(): Unit = {
        cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
    }

    override def postStop(): Unit = cluster.unsubscribe(self)

    override def receive = {
        case MemberUp(member) => {
            nodes += member.address
            log.info(s"Cluster member up: ${member.address}")
        }
        case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}")
        case MemberRemoved(member, previousStatus) => {
            nodes -= member.address
            log.info(s"Cluster member removed: ${member.address}")
        }
        case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}")
        case GetClusterState => sender() ! nodes
        case _: MemberEvent =>
    }

}

object ClusterMonitor {
    case class GetClusterState()
}

应用程序:只需一个测试控制器,即可输出已加入集群的节点列表

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller {

    implicit val addressWrites = new Writes[Address] {
        def writes(address: Address) = Json.obj(
            "host" -> address.host,
            "port" -> address.port,
            "protocol" -> address.protocol,
            "system" -> address.system
        )
    }

    implicit val timeout = Timeout(5, TimeUnit.SECONDS)

    def listClusterNodes = Action.async {
        (clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses =>
            Ok(Json.toJson(addresses))
        }
    }

}

上述控制器的结果产生如下类似的输出:

$ http GET 192.168.99.100:30760/cluster/nodes

HTTP/1.1 200 OK
Content-Length: 235
Content-Type: application/json
Date: Thu, 18 Aug 2016 02:50:30 GMT

[
    {
        "host": "172.17.0.3", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.4", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.5", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }
]
 类似资料:
  • VR播放设置需账号开通VR播放权限,开通权限后账号可以通过全局设置和分类设置控制视频是否为VR(全景)播放。 全局设置 进入视频页面,在全局设置页面,点击VR播放按钮 全局设置VR播放为“ON”时,默认所有分类下的视频为VR播放;全局设置VR播放为“OFF”时,默认所有分类下的视频为非VR播放。 分类设置 若需对某一级分类进行VR播放开关设置,可以选择【分类管理】,在一级分类后 “操作”中的“设置

  • 本章节将介绍如何设置本地节点群集,如何使其成为私有的,以及如何使你在eth-netstat网络上的节点协同工作来监控应用程序。作为网络集成测试(与网络/blockchain同步/消息传播等相关的问题,DAPP开发人员测试多块和多用户场景)的后端,完全可供你的ethereum网络是非常有用的。 我们假设您可以通过安装指南构建geth 设置多个节点 为了在本地运行多个ethereum节点,您必须确保:

  • 进入具体播放器编辑页面,点击播放列表标签,设置播放列表样式。 设置播放列表的样式:选择视频封面缩略图、文本框和文字列表; 设置播放列表的位置:上、下、左、右,文字列表形式只有左右两种位置。

  • 进入具体播放器编辑页面,点击LOGO标签,设置播放器LOGO。 用户可以选择上传或者外链一张图片作为播放器LOGO。 · 可为LOGO添加链接地址,点击跳转到指定位置; · LOGO将被添加在视频播放器上,不会对您的视频源文件产生影响; · 系统支持您根据需要对LOGO显示位置(左上、左下、右上、右下)及透明度进行自由设定; · 可通过开关控制LOGO是否展示; · 系统不提供LOGO大小的修改,

  • 我试图学习Akka集群下面的教程提供了这里 我已经创建了应用程序和回购是在这里。 正如教程中提到的,我已经启动了FrontEndApp 即使我在2551和2552上启动后端应用程序,上述警告消息也会不断重复。 在2551上启动后端参与者的终端日志。 最后一个日志持续重复。 在2552上启动后端参与者的终端日志。 不确定是什么原因群集节点不能检测到彼此和参与者节点与后端。 我会错过任何设置吗?