小编104*_*ica的帖子

如何在PySpark中覆盖Spark ML模型?

from pyspark.ml.regression import RandomForestRegressionModel

rf = RandomForestRegressor(labelCol="label",featuresCol="features", numTrees=5, maxDepth=10, seed=42)
rf_model = rf.fit(train_df)
rf_model_path = "./hdfsData/" + "rfr_model"
rf_model.save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)

当我第一次尝试保存模型时,这些线条起作用了.但是当我想再次将模型保存到路径中时,它会出现以下错误:

Py4JJavaError:调用o1695.save时发生错误.:java.io.IOException:Path ./hdfsData/rfr_model已存在.请使用write.overwrite().save(path)来覆盖它.

然后我尝试了:

rf_model.write.overwrite().save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)

它给了:

AttributeError:'function'对象没有属性'overwrite'

看来该pyspark.mllib模块提供了覆盖功能,但没有提供pyspark.ml模块.如果我想用新模型覆盖旧模型,任何人都知道如何解决这个问题?谢谢.

machine-learning apache-spark pyspark apache-spark-ml apache-spark-mllib

6
推荐指数
1
解决办法
2897
查看次数

在 pyspark 中的 DataFrame 上使用 toPandas() 时出现神秘的“pyarrow.lib.ArrowInvalid:浮点值被截断”错误

我在不是很大的 DataFrame 上使用 toPandas() ,但出现以下异常:

18/10/31 19:13:19 ERROR Executor: Exception in task 127.2 in stage 13.0 (TID 2264)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
      process()
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
      serializer.dump_stream(func(split_index, iterator), outfile)
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 261, in dump_stream
      batch = _create_batch(series, self._timezone)
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in _create_batch
      arrs = [create_array(s, t) for s, t in series]
    File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in <listcomp>
      arrs = [create_array(s, t) for s, t in series] …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark apache-arrow pyarrow

5
推荐指数
2
解决办法
9683
查看次数

使用递归案例类进行Spark

我有一个递归的数据结构。Spark给出了这个错误:

Exception in thread "main" java.lang.UnsupportedOperationException: cannot have circular references in class, but got the circular reference of class BulletPoint
Run Code Online (Sandbox Code Playgroud)

作为示例,我做了以下代码:

case class BulletPoint(item: String, children: List[BulletPoint])

object TestApp extends App {
  val sparkSession = SparkSession
    .builder()
    .appName("spark app")
    .master(s"local")
    .getOrCreate()

  import sparkSession.implicits._

  sparkSession.createDataset(List(BulletPoint("1", Nil), BulletPoint("2", Nil)))
}
Run Code Online (Sandbox Code Playgroud)

有人知道如何解决这个问题吗?

scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
89
查看次数

Spark SQL:在数组值上使用collect_set?

我有一个聚合的 DataFrame,其中有一列使用collect_set. 我现在需要再次聚合这个 DataFrame,并再次应用于collect_set该列的值。问题是我需要应用collect_Set集合的值 - 到目前为止,我看到的唯一方法是分解聚合的 DataFrame。有没有更好的办法?

例子:

初始数据帧:

country   | continent   | attributes
-------------------------------------
Canada    | America     | A
Belgium   | Europe      | Z
USA       | America     | A
Canada    | America     | B
France    | Europe      | Y
France    | Europe      | X
Run Code Online (Sandbox Code Playgroud)

聚合数据帧(我作为输入接收的那个) - 聚合country

country   | continent   | attributes
-------------------------------------
Canada    | America     | A, B
Belgium   | Europe      | Z
USA       | America     | A
France    | …
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-sql

5
推荐指数
1
解决办法
4538
查看次数

为什么BigDecimal的Spark groupBy.agg(min/max)总是返回0?

我正在尝试按DataFrame的一列进行分组,并在每个结果组中生成BigDecimal列的minmax值.结果总是产生非常小的(大约0)值.

(min/max针对Double列的类似调用会产生预期的非零值.)

举个简单的例子:

如果我创建以下DataFrame:

import org.apache.spark.sql.{functions => f}

case class Foo(group: String, bd_value: BigDecimal, d_value: Double)

val rdd = spark.sparkContext.parallelize(Seq(
  Foo("A", BigDecimal("1.0"), 1.0),
  Foo("B", BigDecimal("10.0"), 10.0),
  Foo("B", BigDecimal("1.0"), 1.0),
  Foo("C", BigDecimal("10.0"), 10.0),
  Foo("C", BigDecimal("10.0"), 10.0),
  Foo("C", BigDecimal("10.0"), 10.0)
))

val df = rdd.toDF()
Run Code Online (Sandbox Code Playgroud)

选择maxDouble或BigDecimal列将返回预期结果:

df.select(f.max("d_value")).show()

// +------------+
// |max(d_value)|
// +------------+
// |        10.0|
// +------------+

df.select(f.max("bd_value")).show()

// +--------------------+
// |       max(bd_value)|
// +--------------------+
// |10.00000000000000...|
// +--------------------+
Run Code Online (Sandbox Code Playgroud)

但是如果我分组然后聚合,我得到Double列的合理结果,但BigDecimal列的值接近于零:

df.groupBy("group").agg(f.max("d_value")).show()

// +-----+------------+ …
Run Code Online (Sandbox Code Playgroud)

bigdecimal apache-spark apache-spark-sql

5
推荐指数
1
解决办法
488
查看次数

如何计算Spark中12个月内每个客户滑动1个月的订单总和

我是Scala的新手.目前我正在尝试在每月下滑的12个月期间汇总火花中的订单数据.

下面是我的数据的简单示例,我尝试对其进行格式化,以便您可以轻松地对其进行测试

import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


var sample = Seq(("C1","01/01/2016", 20), ("C1","02/01/2016", 5), 
 ("C1","03/01/2016", 2),  ("C1","04/01/2016", 3), ("C1","05/01/2017", 5),
 ("C1","08/01/2017", 5), ("C1","01/02/2017", 10), ("C1","01/02/2017", 10),  
 ("C1","01/03/2017", 10)).toDF("id","order_date", "orders")

sample = sample.withColumn("order_date",
to_date(unix_timestamp($"order_date", "dd/MM/yyyy").cast("timestamp")))

sample.show 
Run Code Online (Sandbox Code Playgroud)
 +---+----------+------+
 | id|order_date|orders|
 +---+----------+------+
 | C1|2016-01-01|    20|
 | C1|2016-01-02|     5|
 | C1|2016-01-03|     2|
 | C1|2016-01-04|     3|
 | C1|2017-01-05|     5|
 | C1|2017-01-08|     5|
 | C1|2017-02-01|    10|
 | C1|2017-02-01|    10|
 | C1|2017-03-01|    10|
 +---+----------+------+
Run Code Online (Sandbox Code Playgroud)

强加给我的结果如下.

id      period_start    period_end  rolling
C1      2015-01-01      2016-01-01  30
C1      2016-01-01      2017-01-01 …
Run Code Online (Sandbox Code Playgroud)

scala aggregation apache-spark apache-spark-sql

4
推荐指数
1
解决办法
899
查看次数

使用Java 10在org.apache.xbean.asm5.ClassReader.<init>(未知来源)的java.lang.IllegalArgumentException

我试图收集我的rdd时,我开始收到以下错误.它发生在我安装Java 10.1之后所以当然我把它取出并重新安装它,同样的错误.然后我安装了Java 9.04同样的错误.然后我撕掉了python 2.7.14,apache spark 2.3.0和Hadoop 2.7,同样的错误.有没有人有任何其他原因导致我不断收到错误?

>>> from operator import add
>>> from pyspark import SparkConf, SparkContext
>>> import string
>>> import sys
>>> import re
>>>
>>> sc = SparkContext(appName="NEW")
2018-04-21 22:28:45 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
>>> rdd = sc.parallelize(xrange(1,10))
>>> new =rdd.collect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 824, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "C:\spark\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

4
推荐指数
1
解决办法
2571
查看次数

Spark sql group by 和 sum 更改列名?

在这个数据框中,我找到了每个组的总工资。在 Oracle 中我会使用这段代码

select job_id,sum(salary) as "Total" from hr.employees group by job_id;
Run Code Online (Sandbox Code Playgroud)

在 Spark SQL 中尝试了相同的操作,我面临两个问题

empData.groupBy($"job_id").sum("salary").alias("Total").show()
Run Code Online (Sandbox Code Playgroud)
  1. 别名总计未显示,而是显示“总和(工资)”列
  2. 我无法使用$(我认为是 Scala SQL 语法)。遇到编译问题

     empData.groupBy($"job_id").sum($"salary").alias("Total").show()
    
    Run Code Online (Sandbox Code Playgroud)

任何想法?

scala apache-spark

4
推荐指数
1
解决办法
5857
查看次数

在 Pyspark 中转置数据帧

如何在 Pyspark 中转置以下数据框?

这个想法是为了实现下面出现的结果。

import pandas as pd

d = {'id' : pd.Series([1, 1, 1, 2, 2, 2, 3, 3, 3], index=['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']),
     'place' : pd.Series(['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A'], index=['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']),
     'value' : pd.Series([10, 30, 20, 10, 30, 20, 10, 30, 20], index=['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']),
     'attribute' : pd.Series(['size', 'height', 'weigth', 'size', 'height', 'weigth','size', 'height', 'weigth'], …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

4
推荐指数
1
解决办法
7021
查看次数

如何在不使用 alist 的情况下生成没有默认值的配对列表?

我正在尝试对 R 代码进行简单操作,并尝试生成一个相当于

substitute(function(x) x)
Run Code Online (Sandbox Code Playgroud)

我知道我可以围绕这些事情做一些事情

as.call(list(as.symbol("function"), as.pairlist(alist(x=)), as.symbol("x")))

Run Code Online (Sandbox Code Playgroud)

但我正在寻找一种as.pairlist(alist(x=))无需求助的方法alist,或者如果不可能的话,允许我生成等效的表达式,而无需对事物进行硬编码或解析字符串。

我正在修补诸如 之类的东西as.call(list(as.symbol("="), as.symbol("x"))),但这现在似乎是一个死胡同。

r metaprogramming s-expression

4
推荐指数
1
解决办法
181
查看次数