from pyspark.ml.regression import RandomForestRegressionModel
rf = RandomForestRegressor(labelCol="label",featuresCol="features", numTrees=5, maxDepth=10, seed=42)
rf_model = rf.fit(train_df)
rf_model_path = "./hdfsData/" + "rfr_model"
rf_model.save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)
当我第一次尝试保存模型时,这些线条起作用了.但是当我想再次将模型保存到路径中时,它会出现以下错误:
Py4JJavaError:调用o1695.save时发生错误.:java.io.IOException:Path ./hdfsData/rfr_model已存在.请使用write.overwrite().save(path)来覆盖它.
然后我尝试了:
rf_model.write.overwrite().save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)
它给了:
AttributeError:'function'对象没有属性'overwrite'
看来该pyspark.mllib模块提供了覆盖功能,但没有提供pyspark.ml模块.如果我想用新模型覆盖旧模型,任何人都知道如何解决这个问题?谢谢.
machine-learning apache-spark pyspark apache-spark-ml apache-spark-mllib
我在不是很大的 DataFrame 上使用 toPandas() ,但出现以下异常:
18/10/31 19:13:19 ERROR Executor: Exception in task 127.2 in stage 13.0 (TID 2264)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 261, in dump_stream
batch = _create_batch(series, self._timezone)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in <listcomp>
arrs = [create_array(s, t) for s, t in series] …Run Code Online (Sandbox Code Playgroud) 我有一个递归的数据结构。Spark给出了这个错误:
Exception in thread "main" java.lang.UnsupportedOperationException: cannot have circular references in class, but got the circular reference of class BulletPoint
Run Code Online (Sandbox Code Playgroud)
作为示例,我做了以下代码:
case class BulletPoint(item: String, children: List[BulletPoint])
object TestApp extends App {
val sparkSession = SparkSession
.builder()
.appName("spark app")
.master(s"local")
.getOrCreate()
import sparkSession.implicits._
sparkSession.createDataset(List(BulletPoint("1", Nil), BulletPoint("2", Nil)))
}
Run Code Online (Sandbox Code Playgroud)
有人知道如何解决这个问题吗?
我有一个聚合的 DataFrame,其中有一列使用collect_set. 我现在需要再次聚合这个 DataFrame,并再次应用于collect_set该列的值。问题是我需要应用collect_Set集合的值 - 到目前为止,我看到的唯一方法是分解聚合的 DataFrame。有没有更好的办法?
例子:
初始数据帧:
country | continent | attributes
-------------------------------------
Canada | America | A
Belgium | Europe | Z
USA | America | A
Canada | America | B
France | Europe | Y
France | Europe | X
Run Code Online (Sandbox Code Playgroud)
聚合数据帧(我作为输入接收的那个) - 聚合country:
country | continent | attributes
-------------------------------------
Canada | America | A, B
Belgium | Europe | Z
USA | America | A
France | …Run Code Online (Sandbox Code Playgroud) 我正在尝试按DataFrame的一列进行分组,并在每个结果组中生成BigDecimal列的min和max值.结果总是产生非常小的(大约0)值.
(min/max针对Double列的类似调用会产生预期的非零值.)
举个简单的例子:
如果我创建以下DataFrame:
import org.apache.spark.sql.{functions => f}
case class Foo(group: String, bd_value: BigDecimal, d_value: Double)
val rdd = spark.sparkContext.parallelize(Seq(
Foo("A", BigDecimal("1.0"), 1.0),
Foo("B", BigDecimal("10.0"), 10.0),
Foo("B", BigDecimal("1.0"), 1.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0)
))
val df = rdd.toDF()
Run Code Online (Sandbox Code Playgroud)
选择maxDouble或BigDecimal列将返回预期结果:
df.select(f.max("d_value")).show()
// +------------+
// |max(d_value)|
// +------------+
// | 10.0|
// +------------+
df.select(f.max("bd_value")).show()
// +--------------------+
// | max(bd_value)|
// +--------------------+
// |10.00000000000000...|
// +--------------------+
Run Code Online (Sandbox Code Playgroud)
但是如果我分组然后聚合,我得到Double列的合理结果,但BigDecimal列的值接近于零:
df.groupBy("group").agg(f.max("d_value")).show()
// +-----+------------+ …Run Code Online (Sandbox Code Playgroud) 我是Scala的新手.目前我正在尝试在每月下滑的12个月期间汇总火花中的订单数据.
下面是我的数据的简单示例,我尝试对其进行格式化,以便您可以轻松地对其进行测试
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
var sample = Seq(("C1","01/01/2016", 20), ("C1","02/01/2016", 5),
("C1","03/01/2016", 2), ("C1","04/01/2016", 3), ("C1","05/01/2017", 5),
("C1","08/01/2017", 5), ("C1","01/02/2017", 10), ("C1","01/02/2017", 10),
("C1","01/03/2017", 10)).toDF("id","order_date", "orders")
sample = sample.withColumn("order_date",
to_date(unix_timestamp($"order_date", "dd/MM/yyyy").cast("timestamp")))
sample.show
Run Code Online (Sandbox Code Playgroud)
+---+----------+------+
| id|order_date|orders|
+---+----------+------+
| C1|2016-01-01| 20|
| C1|2016-01-02| 5|
| C1|2016-01-03| 2|
| C1|2016-01-04| 3|
| C1|2017-01-05| 5|
| C1|2017-01-08| 5|
| C1|2017-02-01| 10|
| C1|2017-02-01| 10|
| C1|2017-03-01| 10|
+---+----------+------+
Run Code Online (Sandbox Code Playgroud)
强加给我的结果如下.
id period_start period_end rolling
C1 2015-01-01 2016-01-01 30
C1 2016-01-01 2017-01-01 …Run Code Online (Sandbox Code Playgroud) 我试图收集我的rdd时,我开始收到以下错误.它发生在我安装Java 10.1之后所以当然我把它取出并重新安装它,同样的错误.然后我安装了Java 9.04同样的错误.然后我撕掉了python 2.7.14,apache spark 2.3.0和Hadoop 2.7,同样的错误.有没有人有任何其他原因导致我不断收到错误?
>>> from operator import add
>>> from pyspark import SparkConf, SparkContext
>>> import string
>>> import sys
>>> import re
>>>
>>> sc = SparkContext(appName="NEW")
2018-04-21 22:28:45 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
>>> rdd = sc.parallelize(xrange(1,10))
>>> new =rdd.collect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\rdd.py", line 824, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "C:\spark\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in …Run Code Online (Sandbox Code Playgroud) 在这个数据框中,我找到了每个组的总工资。在 Oracle 中我会使用这段代码
select job_id,sum(salary) as "Total" from hr.employees group by job_id;
Run Code Online (Sandbox Code Playgroud)
在 Spark SQL 中尝试了相同的操作,我面临两个问题
empData.groupBy($"job_id").sum("salary").alias("Total").show()
Run Code Online (Sandbox Code Playgroud)
我无法使用$(我认为是 Scala SQL 语法)。遇到编译问题
empData.groupBy($"job_id").sum($"salary").alias("Total").show()
Run Code Online (Sandbox Code Playgroud)任何想法?
如何在 Pyspark 中转置以下数据框?
这个想法是为了实现下面出现的结果。
import pandas as pd
d = {'id' : pd.Series([1, 1, 1, 2, 2, 2, 3, 3, 3], index=['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']),
'place' : pd.Series(['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A', 'A'], index=['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']),
'value' : pd.Series([10, 30, 20, 10, 30, 20, 10, 30, 20], index=['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']),
'attribute' : pd.Series(['size', 'height', 'weigth', 'size', 'height', 'weigth','size', 'height', 'weigth'], …Run Code Online (Sandbox Code Playgroud) 我正在尝试对 R 代码进行简单操作,并尝试生成一个相当于
substitute(function(x) x)
Run Code Online (Sandbox Code Playgroud)
我知道我可以围绕这些事情做一些事情
as.call(list(as.symbol("function"), as.pairlist(alist(x=)), as.symbol("x")))
Run Code Online (Sandbox Code Playgroud)
但我正在寻找一种as.pairlist(alist(x=))无需求助的方法alist,或者如果不可能的话,允许我生成等效的表达式,而无需对事物进行硬编码或解析字符串。
我正在修补诸如 之类的东西as.call(list(as.symbol("="), as.symbol("x"))),但这现在似乎是一个死胡同。
apache-spark ×9
pyspark ×4
scala ×3
aggregation ×1
apache-arrow ×1
bigdecimal ×1
java ×1
pyarrow ×1
r ×1
s-expression ×1