我正在编写一个应用程序,我需要能够判断插入和更新是否成功.我正在使用"INSERT ... IF NOT EXISTS"来获得轻量级事务行为并注意到从execute返回的结果集包含一行包含更新的数据和一个可以查询的"[applied]"列.那样太好了.但是我有一个返回空ResultSet的更新语句.似乎更新成功,但我想要一种程序化的方法来验证.
澄清:
我打开了一些由我的突变返回的结果集的记录.我发现"INSERT ... IF NOT EXIST"返回一个带有名为"[applied]"的布尔列的ResultSet.如果"[applied]"为false,则还返回已退出的行.
使用UPDATE,我总是看到一个空的ResultSet.
所以我有两个问题:
我在网上搜索了很长时间,找不到在Actor模型中克服的面向对象模型的具体缺点.请帮我一些指示和解释.
提前致谢.
假设我已经设置了一个任意复杂的Flow[HttpRequest, HttpResponse, Unit].
我已经可以使用所述流来处理传入的请求了
Http().bindAndHandle(flow, "0.0.0.0", 8080)
Run Code Online (Sandbox Code Playgroud)
现在我想添加日志记录,利用一些现有的指令,比如logRequestResult("my-service"){...}
有没有办法将这个指令与我的流程结合起来?我想我正在寻找另一个指令,一些类似的东西
def completeWithFlow(flow: Flow): Route
Run Code Online (Sandbox Code Playgroud)
这有可能吗?
注意:logRequestResult就是一个例子,我的问题适用于任何可能有用的指令.
为什么是Actor.receive偏函数?我总是可以使用带有匹配表达式的正则函数来代替它。
Akka流大大减少了我的样板代码,并包含许多有用的功能。但是,我需要能够限制项目的处理速度。问题在于,我正在向连接到资源的源链接的Hazelcast队列馈送,以便随着时间的推移(从单个在线站点)下载资源,但是进入队列的链接数量可能会非常大。理想情况下,一次最多运行50-60个请求。Akka Streams中是否有一项功能可以让我限制一次处理的项目数?
进一步的限制是在与某些网站进行交互时需要复杂的状态管理,代码处理和其他功能。Akka Http无法在此提供帮助。我的网络代码完全用Jsoup和Apache Http Components编写,偶尔会调用基于JavaFX的服务器来呈现脚本。
我当前尝试使用文档中描述的使用缓冲区控制输入速率的方法如下:
val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(new DownloadFlow())(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud) 如果我们具有max..min值的具体范围,很容易将其标准化为0..1浮点值,但是如果我们没有具体限制?是否可以构建通用函数以使输出在0和1之间?在我看来,我认为这是不可能的,但我不是数学专家.
我正在寻找JavaScript或PHP的实现,但是C/C++/Python/Delphi上的任何代码都可以提供示例(如果有的话)
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out]
Run Code Online (Sandbox Code Playgroud)
是否maximumBurst意味着可以同时处理的元素数量?
我试图理解为什么下面的代码片段正在做它正在做的事情.我原本以为,因为Sink不能比Source生成内容更快地产生需求,所以我会得到丢弃的消息以响应一些提议(溢出策略设置为Drop Buffer)以及错误和队列关闭消息在自毁之后.
片段:
package playground
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream._
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.duration._
case object MessageToSink
object Playground extends App {
implicit val actorSystem = ActorSystem("Playground")
implicit val execCntxt = actorSystem.dispatcher
val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder])
actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink)
println(s"Playground has started... ${LocalDateTime.now()}")
}
class Actor2SinkFwder extends Actor with ActorLogging {
implicit val materializer = ActorMaterializer()
implicit val execCtxt = context.dispatcher
val flow = Source.queue[Int](bufferSize …Run Code Online (Sandbox Code Playgroud) 是否可以从源代码修改或创建配置文件.我正在创建一些带有远程处理的客户端/服务器架构.我想要实现的是能够启动客户端应用程序,例如:主机/端口,当没有配置文件时,创建一个完成命令行args.
akka {
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1" <--- here
port = 2553 <--- here
}
}
}
Run Code Online (Sandbox Code Playgroud)
配置并不是很复杂.我想从源端口改变端口(最终主机,现在它无论如何都是localhost用于测试),以便自动化它,所以我可以通过将它们传递给main函数来运行多个客户端.
我有一个对元素进行分组的源和一个发出批处理请求的接收器,我使用KillSwitch能够在任意时间点关闭图形。switch.shutdown()调用时源输出丢失的最新不完整批次记录的问题
val source = Source.tick(10.millis, 10.millis, "tick").grouped(500)
val (switch, _) = source.viaMat(KillSwitches.single)(Keep.right)
.toMat(sink)(Keep.both).run()
Thread.sleep(3000) // wait some arbitrary time
switch.shutdown()
Run Code Online (Sandbox Code Playgroud)
关机时有办法“清除”未完成的批次吗?