我试图理解Slick-Hikari是如何工作的,我已经阅读了很多文档,但我有一个用例我的行为我不明白.
我正在使用Slick 3和Hikari,默认配置.我已经有一个同时连接〜1000个用户的生产应用程序.我的应用程序使用websockets,当我部署新版本时,所有客户端都重新连接.(我知道这不是处理部署的最佳方式,但我目前还没有集群.)当所有这些用户重新连接时,他们都开始进行查询以获得用户状态(狗堆效应).当它发生时Slick开始抛出很多错误,如:
java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@4dbbd9d1 rejected from java.util.concurrent.ThreadPoolExecutor@a3b8495[Running, pool size = 20, active threads = 20, queued tasks = 1000, completed tasks = 23740]
Run Code Online (Sandbox Code Playgroud)
我认为它正在发生的是,待处理查询的光滑队列已满,因为它无法处理从数据库请求信息的所有客户端.但是,如果我看到Dropwizard提供给我的指标,我会看到以下内容:
16:45
我们接近部署.在旧实例终止之前,我们可以看到连接数从20增加到40.我认为这是正常的,因为部署过程是如何完成的.
但是,如果由于狗堆效应导致Slick的查询队列变满,为什么如果有20个连接可用,它不会使用超过3-5个连接?数据库表现非常好,所以我认为瓶颈在于Slick.
您对改进此部署过程有什么建议吗?我现在只有1000个用户,但我会在几周内获得更多用户.
我们正在尝试将Akka流与Alpakka Kafka一起使用以消耗服务中的事件流。为了处理事件处理错误,我们使用Kafka自动提交和多个队列。例如,如果我们有user_created
要从产品服务中使用的主题,那么我们还将创建user_created_for_products_failed
和user_created_for_products_dead_letter
。这两个额外的主题与特定的Kafka消费者群体相关。如果事件无法处理,它将进入失败的队列,我们尝试在五分钟内再次消耗事件;如果事件再次失败,则变为死信。
在部署时,我们要确保我们不会丢失事件。因此,我们试图在停止应用程序之前停止流。就像我说的,我们正在使用自动提交,但是所有这些正在“飞行”的事件尚未得到处理。流和应用程序停止后,我们可以部署新代码并重新启动应用程序。
阅读文档后,我们已经了解了该KillSwitch
功能。我们在这看到的问题是,该shutdown
方法返回Unit
,而不是Future[Unit]
像我们期望的那样。我们不确定使用它不会丢失事件,因为在测试中,它看起来太快而无法正常工作。
解决方法是,ActorSystem
为每个流创建一个,然后使用terminate
方法(返回Future[Terminate]
)。此解决方案的问题在于,我们认为创建ActorSystem
每个流的伸缩性不好,并且解决该问题terminate
需要花费很多时间(在测试中最多需要一分钟才能关闭)。
你遇到过这样的问题吗?有没有一种更快的方法(与相比ActorSystem.terminate
)来停止流并确保Source
已处理发出的所有事件?
在Vintage中切换命令模式和插入模式时,是否可以更改编辑器外观的某些部分?
默认情况下,当我切换模式时,sublime仅显示文本COMMAND MODE
和INSERT MODE
状态栏.这个太小了,不能再多了.我正在寻找像这个问题更明显的东西:vim:在插入模式下更改状态行颜色,但在Sublime中
我是Akka和Scala的新手,我来自一个并发的世界。可能我做错了很多事情,即使与问题无关,我也会感谢您的反馈。
我正在用Akka和Scala做一个简单的聊天应用程序。我从“键入功能”开始(不列颠哥伦比亚省的业务要求)...这是whatsapp或“约翰正在键入消息”的典型功能。
我已经使用两种参与者类型对它进行建模:对话者和对话,并且我想对对话参与者进行单元测试。我的对话演员看起来像这样:
object Conversation {
def props(conversationId: UUID, talkers: List[ActorRef])(out: ActorRef) = Props(new Conversation(conversationId, talkers))
case class Typing(talkerId: TalkerId)
}
class Conversation(conversationId: UUID, talkers: List[ActorRef]) extends Actor with ActorLogging {
def receive = LoggingReceive {
case Typing(talkerId) =>
// notify all talkers that a talker is typing
// @TODO don't notify user which is typing
talkers foreach {talker: ActorRef => talker ! InterlocutorTyping(talkerId)}
}
}
Run Code Online (Sandbox Code Playgroud)
我认为,到现在为止很简单。因此,在开始使用Scala和Akka进行编码之前,我已经进行了如下测试:
我真的不知道在Scala和Akka中这是否是正确的方法。我的测试(使用scalatest)如下所示:
"Conversation" should {
"Notify interlocutors when a talker …
Run Code Online (Sandbox Code Playgroud) 嗨,我正在尝试访问名称中包含斜杠"/"的js对象属性.
对象的作用如下:
{
my/key : "my value"
// more stuff here...
}
Run Code Online (Sandbox Code Playgroud)
我尝试以下构造:
myObject["my/key"]
Run Code Online (Sandbox Code Playgroud)
如果我在Chrome DevTools中尝试它,它可以正常工作,但是当我执行我的代码时,我在浏览器控制台上获得了一个漂亮的未定义(使用console.log())
有谁知道发生了什么?:S
我的系统中有两个演员.说话者和谈话.对话包括两个谈话者(现在).当发话方要加入对话,我应该检查是否存在会话(另一健谈创造了它),如果没有,创建它.我在我的Talker演员的方法中有这个代码:
def getOrCreateConversation(conversationId: UUID): ActorRef = {
// @TODO try to get conversation actor by conversationId
context.actorSelection("user/conversation/" + conversationId.toString)
// @TODO if it not exists... create it
context.actorOf(Conversation.props(conversationId), conversationId.toString)
}
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,当我用actorOf创建我的对话演员时,我作为第二个参数传递了conversationId.我这样做是为了方便搜索这个演员...这是正确的方法吗?
谢谢
编辑
感谢@Arne我终于做到了:
class ConversationRouter extends Actor with ActorLogging {
def receive = {
case ConversationEnv(conversationId, msg) =>
val conversation = findConversation(conversationId) match {
case None => createNewConversation(conversationId)
case Some(x) => x
}
conversation forward msg
}
def findConversation(conversationId: UUID): Option[ActorRef] = context.child(conversationId.toString)
def createNewConversation(conversationId: UUID): ActorRef = { …
Run Code Online (Sandbox Code Playgroud) 我正在寻找使用典型的主/从mysql架构配置Slick 3的最佳方法.基本上我想将写入发送到mysql服务器并读取到另一个并信任两个服务器是同步的.
我已经知道了[这] [1]但我认为是Slick 2.我已经开始使用Slick官方文档了,我对此主题没有任何看法.
现在,有一个数据库(没有主从)我在application.conf中有这个.
slick {
dbs {
default {
driver = "slick.driver.MySQLDriver$"
db {
driver = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://localhost/new_chat"
user = "new_chat"
password = "new_chat"
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我正在执行这样的查询:
PersistenceUtils.run(conversationMembers += conversationMember)
Run Code Online (Sandbox Code Playgroud)
PersistenceUtils:
def run[R](a: DBIOAction[R, NoStream, Nothing])
(implicit chatContext: ChatContext, ec: ExecutionContext): Future[R] = {
val result = chatContext.db.run(a)
result.onFailure({ case e => logger.error(s"error executing query: ${a.getDumpInfo.mainInfo}", e) })
result
}
Run Code Online (Sandbox Code Playgroud)
有一种方法可以在application.conf中进行配置吗?我不想在dev中使用master/slave,只能在staging和production中使用.我已经阅读了有关ReplicationDriver的内容,但我在Slick doc中看不到任何内容:S
有谁能给我一些线索?:P
谢谢
scala ×5
akka ×3
mysql ×2
slick ×2
akka-stream ×1
akka-testkit ×1
alpakka ×1
apache-kafka ×1
editor ×1
hikaricp ×1
javascript ×1
json ×1
scalatest ×1
slick-3.0 ×1
sublimetext ×1
sublimetext2 ×1
sublimetext3 ×1
unit-testing ×1
vim ×1