dmc*_*314 2 scala guice akka playframework akka-cluster
我目前正在尝试使用自动发现服务来实现集群播放 + akka 实现。然而,我似乎遇到了游戏中包含的 Guice DI 加载器的问题。他们的文档摘录指出:
\n\nhttps://www.playframework.com/documentation/2.5.x/ScalaAkka#Integrating-with-Akka
\n\n\n\n\n虽然我们建议您使用内置的 Actor 系统,因为它设置了所有内容,例如正确的类加载器、生命周期挂钩等,但没有什么可以阻止您使用自己的 Actor 系统。但重要的是要确保您执行以下操作:
\n\n注册一个停止钩子以在 Play 关闭时关闭 Actor 系统\n 从 Play 环境传递正确的类加载器,否则 Akka 将\xe2\x80\x99 无法找到你的应用程序类
\n\n确保您使用 play.akka.config 更改 Play 读取 akka 配置的位置,或者您不从默认 akka 配置读取 akka 配置,因为这将导致导致问题,例如当系统尝试绑定到相同的远程端口时
\n
我已经完成了他们推荐的上述配置,但是我似乎无法绕过仍然从BuiltInModule绑定它的内部ActorSystemProvider:
\n\nclass BuiltinModule extends Module {\ndef bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = \n\n {\n def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = {\n factories.flatMap(_(env, configuration))\n }\n\n Seq(\n bind[Environment] to env,\n bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)),\n bind[Configuration].toProvider[ConfigurationProvider],\n bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider],\n\n // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it\n // to shut it down.\n bind[DefaultApplicationLifecycle].toSelf,\n bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]),\n\n bind[Application].to[DefaultApplication],\n bind[play.Application].to[play.DefaultApplication],\n\n bind[Router].toProvider[RoutesProvider],\n bind[play.routing.Router].to[JavaRouterAdapter],\n bind[ActorSystem].toProvider[ActorSystemProvider],\n bind[Materializer].toProvider[MaterializerProvider],\n bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider],\n bind[ExecutionContext].to[ExecutionContextExecutor],\n bind[Executor].to[ExecutionContextExecutor],\n bind[HttpExecutionContext].toSelf,\n\n bind[CryptoConfig].toProvider[CryptoConfigParser],\n bind[CookieSigner].toProvider[CookieSignerProvider],\n bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider],\n bind[AESCrypter].toProvider[AESCrypterProvider],\n bind[play.api.libs.Crypto].toSelf,\n bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator]\n ) ++ dynamicBindings(\n HttpErrorHandler.bindingsFromConfiguration,\n HttpFilters.bindingsFromConfiguration,\n HttpRequestHandler.bindingsFromConfiguration,\n ActionCreator.bindingsFromConfiguration\n )\n }\n }\nRun Code Online (Sandbox Code Playgroud)\n\n我尝试创建自己的 GuiceApplicationBuilder 以绕过此问题,但是现在它只是将重复的绑定异常移至来自BuiltInModule。
\n\n这就是我正在尝试的:
\n\nAkka配置模块:
\n\npackage module.akka\n\nimport com.google.inject.{AbstractModule, Inject, Provider, Singleton}\nimport com.typesafe.config.Config\nimport module.akka.AkkaConfigModule.AkkaConfigProvider\nimport net.codingwell.scalaguice.ScalaModule\nimport play.api.Application\n\n/**\n * Created by dmcquill on 8/15/16.\n */\nobject AkkaConfigModule {\n @Singleton\n class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] {\n override def get() = {\n val classLoader = application.classloader\n NodeConfigurator.loadConfig(classLoader)\n }\n }\n}\n\n/**\n * Binds the application configuration to the [[Config]] interface.\n *\n * The config is bound as an eager singleton so that errors in the config are detected\n * as early as possible.\n */\nclass AkkaConfigModule extends AbstractModule with ScalaModule {\n\n override def configure() {\n bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton()\n }\n\n}\nRun Code Online (Sandbox Code Playgroud)\n\nActor系统模块:
\n\npackage module.akka\n\n\nimport actor.cluster.ClusterMonitor\nimport akka.actor.ActorSystem\nimport com.google.inject._\nimport com.typesafe.config.Config\nimport net.codingwell.scalaguice.ScalaModule\nimport play.api.inject.ApplicationLifecycle\n\nimport scala.collection.JavaConversions._\n\n/**\n * Created by dmcquill on 7/27/16.\n */\nobject ActorSystemModule {\n @Singleton\n class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] {\n override def get() = {\n val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp"))\n\n // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector\n GuiceAkkaExtension(system).initialize(injector)\n\n system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", "))\n system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name))\n\n lifecycle.addStopHook { () =>\n system.terminate()\n }\n\n system\n }\n }\n}\n\n/**\n * A module providing an Akka ActorSystem.\n */\nclass ActorSystemModule extends AbstractModule with ScalaModule {\n import module.akka.ActorSystemModule.ActorSystemProvider\n\n override def configure() {\n bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton()\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n\n应用加载器:
\n\nclass CustomApplicationLoader extends GuiceApplicationLoader {\n\n override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {\n initialBuilder\n .overrides(overrides(context): _*)\n .bindings(new AkkaConfigModule, new ActorSystemModule)\n }\n\n}\nRun Code Online (Sandbox Code Playgroud)\n\n我需要完成的主要任务是配置 ActorSystem,以便我可以通过编程方式加载 Akka 集群的种子节点。
\n\n上述方法是正确的方法还是有更好的方法来实现这一目标?如果这是正确的方法,那么我从根本上不理解用于游戏/指导的 DI 设置吗?
\n\n更新
\n\n对于这个架构,play+akka 位于同一个节点上。
\n最后我最终尝试做一些比必要的更复杂的事情。我没有执行上述流程,而是简单地以编程方式扩展了初始配置,以便我可以以编程方式检索必要的网络信息。
最终结果基本上由几个类组成:
NodeConfigurator:此类包含相关实用程序方法,用于从 application.conf 检索属性,然后以编程方式创建配置以与 kubernetes 发现服务结合使用。
object NodeConfigurator {
/**
* This method given a class loader will return the configuration object for an ActorSystem
* in a clustered environment
*
* @param classLoader the configured classloader of the application
* @return Config
*/
def loadConfig(classLoader: ClassLoader) = {
val config = ConfigFactory.load(classLoader)
val clusterName = config.getString(CLUSTER_NAME_PROP)
val seedPort = config.getString(SEED_PORT_CONF_PROP)
val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") {
getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS)
} else {
config.getString(HOST_CONF_PROP)
}
ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host))
.withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host))
.withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host))
.withFallback(config)
.resolve()
}
/**
* Get the local ip address which defaults to localhost if not
* found on the eth0 adapter
*
* @return Option[String]
*/
def getLocalHostAddress: Option[String] = {
import java.net.NetworkInterface
import scala.collection.JavaConversions._
NetworkInterface.getNetworkInterfaces
.find(_.getName equals "eth0")
.flatMap { interface =>
interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress)
}
}
/**
* Retrieves a set of seed nodes that are currently running in our cluster
*
* @param config akka configuration object
* @return Array[String]
*/
def getSeedNodes(config: Config) = {
if(config.hasPath(SEED_NODES_CONF_PROP)) {
config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim)
} else {
Array.empty[String]
}
}
/**
* formats the seed node addresses in the proper format
*
* @param clusterName name of akka cluster
* @param seedNodeAddresses listing of current seed nodes
* @param seedNodePort configured seed node port
* @param defaultSeedNodeAddress default seed node address
* @return
*/
def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = {
if(seedNodeAddresses.isEmpty) {
s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://$clusterName@$defaultSeedNodeAddress:$seedNodePort" ]"""
} else {
seedNodeAddresses.map { address =>
s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://$clusterName@$address:$seedNodePort""""
}.mkString("\n")
}
}
val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name"
val HOST_CONF_PROP = "fitnessAkka.host"
val PORT_CONF_PROP = "fitnessAkka.port"
val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes"
val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port"
private val DEFAULT_HOST_ADDRESS = "127.0.0.1"
}
Run Code Online (Sandbox Code Playgroud)
CustomApplicationLoader:简单地使用 play 的可重写应用程序加载器来从 NodeConfigurator 获取生成的配置,然后用它扩展初始配置。
class CustomApplicationLoader extends GuiceApplicationLoader {
override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
val classLoader = context.environment.classLoader
val configuration = Configuration(NodeConfigurator.loadConfig(classLoader))
initialBuilder
.in(context.environment)
.loadConfig(context.initialConfiguration ++ configuration)
.overrides(overrides(context): _*)
}
}
Run Code Online (Sandbox Code Playgroud)
AkkaActorModule:提供依赖项可注入 actor ref,与 API 一起使用来显示集群成员。
class AkkaActorModule extends AbstractModule with AkkaGuiceSupport {
def configure = {
bindActor[ClusterMonitor]("cluster-monitor")
}
}
Run Code Online (Sandbox Code Playgroud)
ClusterMonitor:这是一个角色,它只是监听集群事件并另外接收消息以生成当前集群状态。
class ClusterMonitor @Inject() extends Actor with ActorLogging {
import actor.cluster.ClusterMonitor.GetClusterState
val cluster = Cluster(context.system)
private var nodes = Set.empty[Address]
override def preStart(): Unit = {
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
}
override def postStop(): Unit = cluster.unsubscribe(self)
override def receive = {
case MemberUp(member) => {
nodes += member.address
log.info(s"Cluster member up: ${member.address}")
}
case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}")
case MemberRemoved(member, previousStatus) => {
nodes -= member.address
log.info(s"Cluster member removed: ${member.address}")
}
case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}")
case GetClusterState => sender() ! nodes
case _: MemberEvent =>
}
}
object ClusterMonitor {
case class GetClusterState()
}
Run Code Online (Sandbox Code Playgroud)
应用程序:只是一个测试控制器,用于输出已加入集群的节点列表
class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller {
implicit val addressWrites = new Writes[Address] {
def writes(address: Address) = Json.obj(
"host" -> address.host,
"port" -> address.port,
"protocol" -> address.protocol,
"system" -> address.system
)
}
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
def listClusterNodes = Action.async {
(clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses =>
Ok(Json.toJson(addresses))
}
}
}
Run Code Online (Sandbox Code Playgroud)
上述控制器的结果产生类似于以下的输出:
$ http GET 192.168.99.100:30760/cluster/nodes
HTTP/1.1 200 OK
Content-Length: 235
Content-Type: application/json
Date: Thu, 18 Aug 2016 02:50:30 GMT
[
{
"host": "172.17.0.3",
"port": 2551,
"protocol": "akka.tcp",
"system": "fitnessApp"
},
{
"host": "172.17.0.4",
"port": 2551,
"protocol": "akka.tcp",
"system": "fitnessApp"
},
{
"host": "172.17.0.5",
"port": 2551,
"protocol": "akka.tcp",
"system": "fitnessApp"
}
]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3947 次 |
| 最近记录: |