使用 Play 框架设置 Akka 集群

dmc*_*314 2 scala guice akka playframework akka-cluster

我目前正在尝试使用自动发现服务来实现集群播放 + akka 实现。然而,我似乎遇到了游戏中包含的 Guice DI 加载器的问题。他们的文档摘录指出:

\n\n

https://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-with-Akka

\n\n
\n

虽然我们建议您使用内置的 Actor 系统,因为它设置了所有内容,例如正确的类加载器、生命周期挂钩等,但没有什么可以阻止您使用自己的 Actor 系统。但重要的是要确保您执行以下操作:

\n\n

注册一个停止钩子以在 Play 关闭时关闭 Actor 系统\n 从 Play 环境传递正确的类加载器,否则 Akka 将\xe2\x80\x99 无法找到你的应用程序类

\n\n

确保您使用 play.akka.config 更改 Play 读取 akka 配置的位置,或者您不从默认 akka 配置读取 akka 配置,因为这将导致导致问题,例如当系统尝试绑定到相同的远程端口时

\n
\n\n

我已经完成了他们推荐的上述配置,但是我似乎无法绕过仍然从BuiltInModule绑定它的内部ActorSystemProvider:

\n\n
class BuiltinModule extends Module {\ndef bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = \n\n    {\n        def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = {\n          factories.flatMap(_(env, configuration))\n        }\n\n        Seq(\n          bind[Environment] to env,\n          bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)),\n          bind[Configuration].toProvider[ConfigurationProvider],\n          bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider],\n\n          // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it\n          // to shut it down.\n          bind[DefaultApplicationLifecycle].toSelf,\n          bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]),\n\n          bind[Application].to[DefaultApplication],\n          bind[play.Application].to[play.DefaultApplication],\n\n          bind[Router].toProvider[RoutesProvider],\n          bind[play.routing.Router].to[JavaRouterAdapter],\n          bind[ActorSystem].toProvider[ActorSystemProvider],\n          bind[Materializer].toProvider[MaterializerProvider],\n          bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider],\n          bind[ExecutionContext].to[ExecutionContextExecutor],\n          bind[Executor].to[ExecutionContextExecutor],\n          bind[HttpExecutionContext].toSelf,\n\n          bind[CryptoConfig].toProvider[CryptoConfigParser],\n          bind[CookieSigner].toProvider[CookieSignerProvider],\n          bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider],\n          bind[AESCrypter].toProvider[AESCrypterProvider],\n          bind[play.api.libs.Crypto].toSelf,\n          bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator]\n        ) ++ dynamicBindings(\n            HttpErrorHandler.bindingsFromConfiguration,\n            HttpFilters.bindingsFromConfiguration,\n            HttpRequestHandler.bindingsFromConfiguration,\n            ActionCreator.bindingsFromConfiguration\n          )\n      }\n    }\n
Run Code Online (Sandbox Code Playgroud)\n\n

我尝试创建自己的 GuiceApplicationBuilder 以绕过此问题,但是现在它只是将重复的绑定异常移至来自BuiltInModule。

\n\n

这就是我正在尝试的:

\n\n

Akka配置模块:

\n\n
package module.akka\n\nimport com.google.inject.{AbstractModule, Inject, Provider, Singleton}\nimport com.typesafe.config.Config\nimport module.akka.AkkaConfigModule.AkkaConfigProvider\nimport net.codingwell.scalaguice.ScalaModule\nimport play.api.Application\n\n/**\n  * Created by dmcquill on 8/15/16.\n  */\nobject AkkaConfigModule {\n    @Singleton\n    class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] {\n        override def get() = {\n            val classLoader = application.classloader\n            NodeConfigurator.loadConfig(classLoader)\n        }\n    }\n}\n\n/**\n  * Binds the application configuration to the [[Config]] interface.\n  *\n  * The config is bound as an eager singleton so that errors in the config are detected\n  * as early as possible.\n  */\nclass AkkaConfigModule extends AbstractModule with ScalaModule {\n\n    override def configure() {\n        bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton()\n    }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

Actor系统模块:

\n\n
package module.akka\n\n\nimport actor.cluster.ClusterMonitor\nimport akka.actor.ActorSystem\nimport com.google.inject._\nimport com.typesafe.config.Config\nimport net.codingwell.scalaguice.ScalaModule\nimport play.api.inject.ApplicationLifecycle\n\nimport scala.collection.JavaConversions._\n\n/**\n  * Created by dmcquill on 7/27/16.\n  */\nobject ActorSystemModule {\n    @Singleton\n    class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] {\n        override def get() = {\n            val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp"))\n\n            // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector\n            GuiceAkkaExtension(system).initialize(injector)\n\n            system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", "))\n            system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name))\n\n            lifecycle.addStopHook { () =>\n                system.terminate()\n            }\n\n            system\n        }\n    }\n}\n\n/**\n  * A module providing an Akka ActorSystem.\n  */\nclass ActorSystemModule extends AbstractModule with ScalaModule {\n    import module.akka.ActorSystemModule.ActorSystemProvider\n\n    override def configure() {\n        bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton()\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

应用加载器:

\n\n
class CustomApplicationLoader extends GuiceApplicationLoader {\n\n    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {\n        initialBuilder\n            .overrides(overrides(context): _*)\n            .bindings(new AkkaConfigModule, new ActorSystemModule)\n    }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

我需要完成的主要任务是配置 ActorSystem,以便我可以通过编程方式加载 Akka 集群的种子节点。

\n\n

上述方法是正确的方法还是有更好的方法来实现这一目标?如果这是正确的方法,那么我从根本上不理解用于游戏/指导的 DI 设置吗?

\n\n

更新

\n\n

对于这个架构,play+akka 位于同一个节点上。

\n

dmc*_*314 5

最后我最终尝试做一些比必要的更复杂的事情。我没有执行上述流程,而是简单地以编程方式扩展了初始配置,以便我可以以编程方式检索必要的网络信息。

最终结果基本上由几个类组成:

NodeConfigurator:此类包含相关实用程序方法,用于从 application.conf 检索属性,然后以编程方式创建配置以与 kubernetes 发现服务结合使用。

object NodeConfigurator {

    /**
      * This method given a class loader will return the configuration object for an ActorSystem
      * in a clustered environment
      *
      * @param classLoader the configured classloader of the application
      * @return Config
      */
    def loadConfig(classLoader: ClassLoader) = {
        val config = ConfigFactory.load(classLoader)

        val clusterName = config.getString(CLUSTER_NAME_PROP)
        val seedPort = config.getString(SEED_PORT_CONF_PROP)

        val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") {
            getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS)
        } else {
            config.getString(HOST_CONF_PROP)
        }

        ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host))
            .withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host))
            .withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host))
            .withFallback(config)
            .resolve()
    }

    /**
      * Get the local ip address which defaults to localhost if not
      * found on the eth0 adapter
      *
      * @return Option[String]
      */
    def getLocalHostAddress:  Option[String] = {
        import java.net.NetworkInterface

        import scala.collection.JavaConversions._

        NetworkInterface.getNetworkInterfaces
            .find(_.getName equals "eth0")
            .flatMap { interface =>
                interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress)
            }
    }

    /**
      * Retrieves a set of seed nodes that are currently running in our cluster
      *
      * @param config akka configuration object
      * @return Array[String]
      */
    def getSeedNodes(config: Config) = {
        if(config.hasPath(SEED_NODES_CONF_PROP)) {
            config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim)
        } else {
            Array.empty[String]
        }
    }

    /**
      * formats the seed node addresses in the proper format
      *
      * @param clusterName name of akka cluster
      * @param seedNodeAddresses listing of current seed nodes
      * @param seedNodePort configured seed node port
      * @param defaultSeedNodeAddress default seed node address
      * @return
      */
    def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = {
        if(seedNodeAddresses.isEmpty) {
            s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://$clusterName@$defaultSeedNodeAddress:$seedNodePort" ]"""
        } else {
            seedNodeAddresses.map { address =>
                s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://$clusterName@$address:$seedNodePort""""
            }.mkString("\n")
        }
    }

    val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name"
    val HOST_CONF_PROP = "fitnessAkka.host"
    val PORT_CONF_PROP = "fitnessAkka.port"
    val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes"
    val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port"

    private val DEFAULT_HOST_ADDRESS = "127.0.0.1"
}
Run Code Online (Sandbox Code Playgroud)

CustomApplicationLoader:简单地使用 play 的可重写应用程序加载器来从 NodeConfigurator 获取生成的配置,然后用它扩展初始配置。

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        val classLoader = context.environment.classLoader
        val configuration = Configuration(NodeConfigurator.loadConfig(classLoader))

        initialBuilder
                .in(context.environment)
                .loadConfig(context.initialConfiguration ++ configuration)
                .overrides(overrides(context): _*)
    }

}
Run Code Online (Sandbox Code Playgroud)

AkkaActorModule:提供依赖项可注入 actor ref,与 API 一起使用来显示集群成员。

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport {
    def configure = {
        bindActor[ClusterMonitor]("cluster-monitor")
    }
}
Run Code Online (Sandbox Code Playgroud)

ClusterMonitor:这是一个角色,它只是监听集群事件并另外接收消息以生成当前集群状态。

class ClusterMonitor @Inject() extends Actor with ActorLogging {
    import actor.cluster.ClusterMonitor.GetClusterState

    val cluster = Cluster(context.system)
    private var nodes = Set.empty[Address]

    override def preStart(): Unit = {
        cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
    }

    override def postStop(): Unit = cluster.unsubscribe(self)

    override def receive = {
        case MemberUp(member) => {
            nodes += member.address
            log.info(s"Cluster member up: ${member.address}")
        }
        case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}")
        case MemberRemoved(member, previousStatus) => {
            nodes -= member.address
            log.info(s"Cluster member removed: ${member.address}")
        }
        case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}")
        case GetClusterState => sender() ! nodes
        case _: MemberEvent =>
    }

}

object ClusterMonitor {
    case class GetClusterState()
}
Run Code Online (Sandbox Code Playgroud)

应用程序:只是一个测试控制器,用于输出已加入集群的节点列表

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller {

    implicit val addressWrites = new Writes[Address] {
        def writes(address: Address) = Json.obj(
            "host" -> address.host,
            "port" -> address.port,
            "protocol" -> address.protocol,
            "system" -> address.system
        )
    }

    implicit val timeout = Timeout(5, TimeUnit.SECONDS)

    def listClusterNodes = Action.async {
        (clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses =>
            Ok(Json.toJson(addresses))
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

上述控制器的结果产生类似于以下的输出:

$ http GET 192.168.99.100:30760/cluster/nodes

HTTP/1.1 200 OK
Content-Length: 235
Content-Type: application/json
Date: Thu, 18 Aug 2016 02:50:30 GMT

[
    {
        "host": "172.17.0.3", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.4", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.5", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }
]
Run Code Online (Sandbox Code Playgroud)