小编Mah*_*and的帖子

如何在流数据帧上应用滞后函数?

我有一个具有三列 time、col1、col2 的流数据帧。我必须在第 2 列上应用滞后函数。我尝试过以下查询。

val w = org.apache.spark.sql.expressions.Window.oderBy("time")
df.select(col("time"),col("col1"),lag("col3",1).over(w))
Run Code Online (Sandbox Code Playgroud)

但它给出了以下异常

org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?

提前致谢。

scala window-functions apache-spark apache-spark-sql spark-structured-streaming

5
推荐指数
0
解决办法
1394
查看次数

如何将完全输出模式下的流聚合保存到 Parquet?

我已经使用完整模式在流数据帧上应用了聚合。为了在本地保存数据帧,我实现了接收foreach器。我能够以文本形式保存数据框。但我需要以 Parquet 格式保存它。

val writerForText = new ForeachWriter[Row] {
    var fileWriter: FileWriter = _

    override def process(value: Row): Unit = {
      fileWriter.append(value.toSeq.mkString(","))
    }

    override def close(errorOrNull: Throwable): Unit = {
      fileWriter.close()
    }

    override def open(partitionId: Long, version: Long): Boolean = {
      FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
      fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
      true

    }
  }

val columnName = "col1"
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName))
              .writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start()
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?提前致谢!

scala apache-spark parquet spark-structured-streaming

5
推荐指数
1
解决办法
1046
查看次数

com.datastax.driver.core.exceptions.BusyPoolException

每当我在Cassandra中的表中插入数据,超过1000并通过id获取数据时,它会抛出以下异常:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.BusyPoolException: [localhost/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256)))
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:213)
    at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:49)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:277)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:340)
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1764)
    at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
    at com.google.common.util.concurrent.Futures$ImmediateFuture.addListener(Futures.java:153)
    at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1776)
    at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1713)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:299)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:274)
    at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:117)
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:97)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
    at com.outworkers.phantom.builder.query.CassandraOperations$class.scalaQueryStringToPromise(CassandraOperations.scala:67)
    at com.outworkers.phantom.builder.query.InsertQuery.scalaQueryStringToPromise(InsertQuery.scala:31)
    at com.outworkers.phantom.builder.query.CassandraOperations$class.scalaQueryStringExecuteToFuture(CassandraOperations.scala:31)
    at com.outworkers.phantom.builder.query.InsertQuery.scalaQueryStringExecuteToFuture(InsertQuery.scala:31)
    at com.outworkers.phantom.builder.query.ExecutableStatement$class.future(ExecutableQuery.scala:80)
    at com.outworkers.phantom.builder.query.InsertQuery.future(InsertQuery.scala:31)
    at nd.cluster.data.store.Points.upsert(Models.scala:114)
Run Code Online (Sandbox Code Playgroud)

我使用PoolingOptions解决了上述问题.

val poolingOptions = new PoolingOptions()
    .setConnectionsPerHost(HostDistance.LOCAL, 1, 200)
    .setMaxRequestsPerConnection(HostDistance.LOCAL, 256) …
Run Code Online (Sandbox Code Playgroud)

cassandra datastax

5
推荐指数
1
解决办法
4089
查看次数

如果测试覆盖率低于最低阈值,如何中断构建?

如果测试覆盖率低于阈值,我们希望在 codeship 上构建失败。但它并没有使构建失败。

覆盖插件:

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")
Run Code Online (Sandbox Code Playgroud)

在 build.sbt 中添加了以下两条语句

coverageMinimum := 80,
coverageFailOnMinimum := true
Run Code Online (Sandbox Code Playgroud)

如果测试覆盖率低于 80,即使在本地也不会失败。我运行的命令是

sbt clean coverage test coverageReport
Run Code Online (Sandbox Code Playgroud)

scala sbt scalatest scoverage

5
推荐指数
1
解决办法
538
查看次数

可以根据更高阶函数实现对象吗?

我们可以用其他方式实现一个概念吗?

Objects in terms of higher order functions?
Higher order functions in terms of objects?
Run Code Online (Sandbox Code Playgroud)

scala

2
推荐指数
1
解决办法
139
查看次数