小编Luc*_*ess的帖子

Pandas将时间序列数据重新采样到15分钟和45分钟 - 使用多索引或列

我有一些时间序列数据作为Pandas数据帧,从一小时后15分钟和过去45分钟(30分钟的时间间隔)开始观察,然后将频率改变为每分钟.我想对数据进行重新采样,使其在整个数据帧的每小时30分钟,15小时和45小时的常规频率.

我想到了实现这个目标的两种方法.
1.使用时间序列数据作为数据帧中的列,只需在15分钟和45分钟时过滤所有观测值的数据帧.
2.重新设置索引,使时间序列数据是多索引的一部分(索引的第0级是气象站,第1级是观察的时间)并使用熊猫日期时间序列功能如resample().

原始数据帧,天气,如下所示:

                  parsed_time           Pressure  Temp    Hum
Station   (index)   
Bow       1        2018-04-15 14:15:00   1012     20.0    87
          2        2018-04-15 14:45:00   1013     20.0    87
          3        2018-04-15 15:15:00   1012     21.0    87
          4        2018-04-15 15:45:00   1014     22.0    86
          5        2018-04-15 16:00:00   1015     22.0    86
          6        2018-04-15 16:01:00   1012     25.0    86
          7        2018-04-15 16:02:00   1012     25.0    86
Stratford 8        2018-04-15 14:15:00   1011     18.0    87
          9        2018-04-15 14:45:00   1011     18.0    87
          10       2018-04-15 15:15:00   1012     18.0    87
          11       2018-04-15 15:45:00   1014     19.0 …
Run Code Online (Sandbox Code Playgroud)

python time-series multi-index dataframe pandas

6
推荐指数
1
解决办法
4401
查看次数

对于基本数据框创建示例,我应该如何在Spark中编写单元测试?

我正在努力编写一个基本单元测试来创建数据框,使用Spark提供的示例文本文件,如下所示.

class dataLoadTest extends FunSuite with Matchers with BeforeAndAfterEach {

private val master = "local[*]"
private val appName = "data_load_testing"

private var spark: SparkSession = _

override def beforeEach() {
  spark = new SparkSession.Builder().appName(appName).getOrCreate()
}

import spark.implicits._

 case class Person(name: String, age: Int)

  val df = spark.sparkContext
      .textFile("/Applications/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0),attributes(1).trim.toInt))
      .toDF()

  test("Creating dataframe should produce data from of correct size") {
  assert(df.count() == 3)
  assert(df.take(1).equals(Array("Michael",29)))
}

override def afterEach(): Unit = {
  spark.stop()
}
Run Code Online (Sandbox Code Playgroud)

}

我知道代码本身是有效的(来自spark.implicits._ .... toDF()),因为我已经在Spark-Scala …

unit-testing scala intellij-idea apache-spark

4
推荐指数
1
解决办法
3570
查看次数

如何处理Spark和Scala中的异常

我正在尝试处理Spark中的常见异常,例如.map操作无法正确处理数据的所有元素或FileNotFound异常.我已阅读所有现有问题和以下两个帖子:

https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html

https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark

我在行内尝试了一个Try语句,attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
因此它会读取attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)

但这不会编译; 编译器.toDF()以后不会识别该语句.我也尝试了类似Java的Try {Catch {}}块但无法正确获取范围; df然后不归还.有谁知道如何正确地做到这一点?我甚至需要处理这些异常,因为Spark框架似乎已经处理了FileNotFound异常,而我没有添加一个异常.但是,如果输入文件的列数错误,我想生成模式中字段数的错误.

这是代码:

object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in        subsequent tests */
def createDataFrame(fileName: String): DataFrame = {

import spark.implicits._

//try {
val df = spark.sparkContext
  .textFile("/path/to/file" + fileName)
  .map(_.split("\\t"))
//mHealth user is the case class which defines the data schema
  .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
        attributes(3).toDouble, attributes(4).toDouble, …
Run Code Online (Sandbox Code Playgroud)

scala exception-handling apache-spark

4
推荐指数
1
解决办法
2万
查看次数

试图让Apache Spark与IntelliJ一起工作

我想让Apache Spark与IntelliJ一起工作.我在IntelliJ中创建了一个SBT项目并完成了以下工作:1.转到文件 - >项目结构 - >库2.单击中间部分的"+",单击Maven,单击Maven Repository中的Download Library,键入文本' spark-core'和org.apache.spark:spark-core_2.11:2.2.0,这是Spark的最新版本可用我将jar文件和源代码下载到项目文件夹3中的./lib中.Spark库现在显示在库列表中4.然后我右键单击org.apache.spark:spark-core_2.11:2.2.0并单击添加到项目并添加到模块现在当我单击左侧的模块时,然后我的主项目文件夹,然后右侧的Dependencies选项卡我可以看到外部库作为Maven库,但在单击Apply,重新构建项目并重新启动IntelliJ后,它将不会显示为外部库在项目中.因此我无法访问Spark API命令.我做错了什么?我查看了有关IntelliJ和其他一百个来源的所有文档,但找不到答案.

另外,我是否还需要在build.SBT文件中包含以下文本,以及将Apache Spark指定为外部库依赖项?我假设我需要在build.SBT文件中包含代码,或者手动将Spark添加为外部依赖项,但不能同时添加.我在build.SBT文件中包含了这段代码:

name := "Spark_example"

version := "1.0"

scalaVersion := "2.12.3"

val sparkVersion = "2.0.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion
)
Run Code Online (Sandbox Code Playgroud)

我收到一个错误:sbt.ResolveException:unresolved dependency:org.apache.spark#spark-core_2.12; 2.2.0:not found

请帮忙!谢谢

scala intellij-idea sbt apache-spark

2
推荐指数
1
解决办法
3526
查看次数

Spark 和 ScalaNLP 库 Breeze 可以一起使用吗?

我正在 Apache Spark 中开发基于 Scala 的极限学习机。我的模型必须是 Spark Estimator 并使用 Spark 框架才能适应机器学习管道。有谁知道 Breeze 是否可以与 Spark 一起使用?我的所有数据都在 Spark 数据帧中,可以想象我可以使用 Breeze 导入它,使用 Breeze DenseVectors 作为数据结构,然后转换为用于 Estimator 部分的 DataFrame。Breeze 的优点是它有一个pinv针对 Moore-Penrose 伪逆的函数,它是一个非方阵的逆。据我所知,Spark MLlib 中没有等效的功能。我不知道是否可以将 Breeze 张量转换为 Spark DataFrames,所以如果有人有这方面的经验,那将非常有用。谢谢!

matrix-inverse scala-breeze apache-spark

2
推荐指数
1
解决办法
980
查看次数

将特征的 Spark 向量转换为数组

我有一个 features 列,它使用 Spark 的 VectorAssembler 打包成一个向量向量,如下所示。data是输入数据帧(类型spark.sql.DataFrame)。

val featureCols = Array("feature_1","feature_2","feature_3")
val featureAssembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val dataWithFeatures = featureAssembler.transform(data)
Run Code Online (Sandbox Code Playgroud)

我现在用的是开发一个自定义分类ClassifierClassificationModel开发者API。ClassificationModel需要开发一个predictRaw()函数,该函数输出模型的预测标签向量。

def predictRaw(features: FeaturesType) : Vector
Run Code Online (Sandbox Code Playgroud)

这个函数是由 API 设置的,它接受一个参数、特征FeaturesType并输出一个 Vector(在我的例子中,我把它当作一个 SparkDenseVector作为DenseVector扩展Vectortrait)。

由于 VectorAssembler 的封装,features列是类型的Vector,每个元素本身就是一个向量,每个训练样本的原始特征。例如:

特征列 - Vector
[1.0, 2.0, 3.0] 类型 - element1,本身是一个向量
[3.5, 4.5, 5.5] - element2,本身是一个向量

我需要将这些功能提取到一个Array[Double]中以实现我的predictRaw()逻辑。理想情况下,我想要以下结果以保留基数:

`val result: Array[Double] …
Run Code Online (Sandbox Code Playgroud)

arrays scala vector apache-spark apache-spark-sql

2
推荐指数
1
解决办法
1万
查看次数

基于字符串条件过滤行,dplyr过滤器,包含

我想使用 dplyr contains() 和过滤器过滤数据帧。一定很简单吧?我见过的例子使用了基本的 R grepl ,它在某种程度上击败了对象。这是一个简单的数据框:

site_type <- c('Urban','Rural','Rural Background','Urban Background','Roadside','Kerbside')
df <- data.frame(row_id, site_type)
df <- as.tibble(df)
df
Run Code Online (Sandbox Code Playgroud)

现在我想按 site.type 包含字符串背景的所有行过滤数据框。如果我知道 site_type 的唯一值,我可以直接找到该字符串:

filtered_df <- filter(df, site_type == 'Urban Background')

但我想做一些类似的事情:

filtered_df <- filter(df, site_type(contains('background', match_case = False)))

有什么想法如何做到这一点吗?dplyr 助手contains只能用于列而不是行吗?

r contains filter dplyr

1
推荐指数
1
解决办法
2万
查看次数