Akka流大大减少了我的样板代码,并包含许多有用的功能。但是,我需要能够限制项目的处理速度。问题在于,我正在向连接到资源的源链接的Hazelcast队列馈送,以便随着时间的推移(从单个在线站点)下载资源,但是进入队列的链接数量可能会非常大。理想情况下,一次最多运行50-60个请求。Akka Streams中是否有一项功能可以让我限制一次处理的项目数?
进一步的限制是在与某些网站进行交互时需要复杂的状态管理,代码处理和其他功能。Akka Http无法在此提供帮助。我的网络代码完全用Jsoup和Apache Http Components编写,偶尔会调用基于JavaFX的服务器来呈现脚本。
我当前尝试使用文档中描述的使用缓冲区控制输入速率的方法如下:
val sourceGraph: Graph[SourceShape[(FlowConfig, Term)], NotUsed] = new HazelcastTermSource(conf.termQueue, conf)
val source = Source.fromGraph(sourceGraph)
val (killSwitch, last) = source
.buffer(conf.crawlStreamConf.maxCrawlConcurrency, OverflowStrategy.backpressure)
.viaMat(new DownloadFlow())(Keep.both)
.map(x => println(x))
.to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud) 在日食中,我似乎不断地与JCEF一起运行铬.我能够到达发现本机函数但仍无法完成初始化的程度.我设置了LD_PRELOAD变量.我正在运行MainFrame.java类和自定义Scala代码,并在每个代码中遇到相同的问题.有办法解决这个问题吗?
系统:
操作系统:Ubuntu 16.04
JCEF第3版
CEF第3版
Java Jdk 8
结构和配置:
一切都在二元分布结构下.我将jar作为库导入,将本机库路径添加到jcef jar并将其导入到我的项目中.
我使用环境变量设置运行配置:
我的所有库和*.pak文件都位于同一目录和libcef.so所在的子目录(二进制分发版)中,以及chrome沙箱和帮助程序.
代码和错误
以下代码后代码失败:
println("Generating Handlers")
CefApp.addAppHandler(Handlers.getHandlerAdapter)
private var settings = new CefSettings
settings.windowless_rendering_enabled = useOSR
println("Starting App")
private final val cefApp : CefApp = if(commandLineArgs != null && commandLineArgs.size > 0) CefApp.getInstance(ChromeCommandLineParser.parse(commandLineArgs)) else CefApp.getInstance(settings)
println("Creating Client")
private final val client : CefClient = cefApp.createClient()
Run Code Online (Sandbox Code Playgroud)
以下输出结果:
Starting
Generating Handlers
Starting App
Creating Client
initialize on Thread[AWT-EventQueue-0,6,main] with library path /home/XXXXX/jcef/src/binary_distrib/linux64/bin/lib/linux64
[0413/135633:ERROR:icu_util.cc(157)] Invalid file descriptor …
Run Code Online (Sandbox Code Playgroud) 我最近更新了一个使用 scala 的项目,并决定将我的依赖项(包括使用 Kettle 的 ETL 流程)从 Pentaho 5.4 迁移到 Maven。但是,当我使用 javascript 运行转换时,我不断收到错误消息,告诉我函数不存在。我的具体错误如下。如何使用 Maven 进行转换以正确运行 JS?
Pentaho 当前的 POM 依赖项
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>5.4.0.4-149</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-db</artifactId>
<version>4.4.3.5-C183</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-ui-swt</artifactId>
<version>5.4.0.4-149</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>5.4.0.4-149</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-dbdialog</artifactId>
<version>5.4.0.4-149</version>
</dependency>
<dependency>
<groupId>pentaho-library</groupId>
<artifactId>libformula</artifactId>
<version>5.4.0.1-130</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4-1202-jdbc42</version>
</dependency>
<dependency>
<groupId>pentaho-library</groupId>
<artifactId>libformula</artifactId>
<version>5.4.0.1-130</version>
</dependency>
<dependency>
<groupId>rhino</groupId>
<artifactId>js</artifactId>
<version>1.7R3</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>2.5.16</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>javax.mail</groupId>
<artifactId>javax.mail-api</artifactId>
<version>1.5.4</version>
</dependency> …
Run Code Online (Sandbox Code Playgroud) 我是斯卡拉的新手.它有助于减少代码并提供功能语言的元素来处理数据.但是,我无法找到并行执行lst期货的方法.我的列表是List [Future [String]]类型.如何让这个列表并行执行?
val futures=(data.map { x => this.breakString(x) }).toList
Run Code Online (Sandbox Code Playgroud)
未来定义为:
def breakString(inX:Object):Future[String]=Future {
//get new jsonObject
val x =inX.asInstanceOf[String]
val jmap=JacksMapper.readValue[Map[String,AnyRef]](x)
val dataArr:Array[String]=jmap.get(this.rowcolumn).asInstanceOf[String].split(token)
val map=dataArr.map { x => (positions.get(dataArr.indexOf(x).asInstanceOf[String]),x) }.toMap
map.put(hashKey, jmap.get(hashKey).asInstanceOf[String])
//write out positions
JacksMapper.writeValueAsString(map)
}
Run Code Online (Sandbox Code Playgroud) scala ×3
java ×2
akka ×1
akka-stream ×1
concurrency ×1
future ×1
hazelcast ×1
maven ×1
pentaho ×1
pom.xml ×1