我有一个具有三列 time、col1、col2 的流数据帧。我必须在第 2 列上应用滞后函数。我尝试过以下查询。
val w = org.apache.spark.sql.expressions.Window.oderBy("time")
df.select(col("time"),col("col1"),lag("col3",1).over(w))
Run Code Online (Sandbox Code Playgroud)
但它给出了以下异常
org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?
提前致谢。
scala window-functions apache-spark apache-spark-sql spark-structured-streaming
我已经使用完整模式在流数据帧上应用了聚合。为了在本地保存数据帧,我实现了接收foreach器。我能够以文本形式保存数据框。但我需要以 Parquet 格式保存它。
val writerForText = new ForeachWriter[Row] {
var fileWriter: FileWriter = _
override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
true
}
}
val columnName = "col1"
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName))
.writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start()
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?提前致谢!
每当我在Cassandra中的表中插入数据,超过1000并通过id获取数据时,它会抛出以下异常:
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [localhost/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256)))
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:213)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:49)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:277)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:340)
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1764)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:299)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:274)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:117)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:97)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
at com.outworkers.phantom.builder.query.CassandraOperations$class.scalaQueryStringToPromise(CassandraOperations.scala:67)
at com.outworkers.phantom.builder.query.InsertQuery.scalaQueryStringToPromise(InsertQuery.scala:31)
at com.outworkers.phantom.builder.query.CassandraOperations$class.scalaQueryStringExecuteToFuture(CassandraOperations.scala:31)
at com.outworkers.phantom.builder.query.InsertQuery.scalaQueryStringExecuteToFuture(InsertQuery.scala:31)
at com.outworkers.phantom.builder.query.ExecutableStatement$class.future(ExecutableQuery.scala:80)
at com.outworkers.phantom.builder.query.InsertQuery.future(InsertQuery.scala:31)
at nd.cluster.data.store.Points.upsert(Models.scala:114)
Run Code Online (Sandbox Code Playgroud)
我使用PoolingOptions解决了上述问题.
val poolingOptions = new PoolingOptions()
.setConnectionsPerHost(HostDistance.LOCAL, 1, 200)
.setMaxRequestsPerConnection(HostDistance.LOCAL, 256) …Run Code Online (Sandbox Code Playgroud) 如果测试覆盖率低于阈值,我们希望在 codeship 上构建失败。但它并没有使构建失败。
覆盖插件:
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")
Run Code Online (Sandbox Code Playgroud)
在 build.sbt 中添加了以下两条语句
coverageMinimum := 80,
coverageFailOnMinimum := true
Run Code Online (Sandbox Code Playgroud)
如果测试覆盖率低于 80,即使在本地也不会失败。我运行的命令是
sbt clean coverage test coverageReport
Run Code Online (Sandbox Code Playgroud) 我们可以用其他方式实现一个概念吗?
Objects in terms of higher order functions?
Higher order functions in terms of objects?
Run Code Online (Sandbox Code Playgroud)