相关疑难解决方法(0)

如何访问Spark DataFrame中VectorUDT列的元素?

我有一个数据帧dfVectorUDT指定的列features.如何获取列的元素,比如第一个元素?

我尝试过以下操作

from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
Run Code Online (Sandbox Code Playgroud)

但是我收到了一个net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)错误.如果我first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])改为相同的错误.

我也试过,explode()但我得到一个错误,因为它需要一个数组或地图类型.

我认为这应该是一种常见的操作.

dataframe apache-spark apache-spark-sql pyspark apache-spark-ml

16
推荐指数
1
解决办法
5886
查看次数

pyspark 提取 ROC 曲线?

有没有办法从pyspark中的Spark ML获取ROC曲线上的点?在文档中,我看到了一个 Scala 的例子,但不是 python:https : //spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html

那正确吗?我当然可以想出实现它的方法,但我不得不想象如果有一个预先构建的函数它会更快。我正在处理 300 万个分数和几十个模型,所以速度很重要。

pyspark apache-spark-ml

12
推荐指数
2
解决办法
1万
查看次数

在 PySpark 中的多列上应用 MinMaxScaler

我想将MinMaxScalarPySpark 应用于 PySpark 数据框的多列df。到目前为止,我只知道如何将它应用于单个列,例如x.

from pyspark.ml.feature import MinMaxScaler

pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)

scaler = MinMaxScaler(inputCol="x", outputCol="x")
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)
Run Code Online (Sandbox Code Playgroud)

如果我有 100 列怎么办?有没有办法对 PySpark 中的许多列进行最小-最大缩放?

更新:

另外,如何应用MinMaxScalar整数或双精度值?它引发以下错误:

java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int.
Run Code Online (Sandbox Code Playgroud)

python apache-spark-sql pyspark

11
推荐指数
2
解决办法
7071
查看次数

如何在 Spark 中使用 Dataframes 的关联性?

Spark 2.2.0添加了对数据帧的关联支持。有关这方面的更多信息可以在拉取请求中找到。

MLlib 基于 DataFrame 的 API 中的新算法:

SPARK-19636:基于 DataFrame 的 API 中的关联 (Scala/Java/Python)

然而,完全不清楚如何使用此更改或与以前的版本相比发生了什么变化。

我期待这样的事情:

df_num = spark.read.parquet('/dataframe')
df_cat.printSchema()
df_cat.show()
df_num.corr(col1='features', col2='fail_mode_meas')
Run Code Online (Sandbox Code Playgroud)
df_num = spark.read.parquet('/dataframe')
df_cat.printSchema()
df_cat.show()
df_num.corr(col1='features', col2='fail_mode_meas')
Run Code Online (Sandbox Code Playgroud)

有人可以解释如何利用 Spark 2.2.0 的新功能来实现数据帧中的关联吗?

python correlation apache-spark apache-spark-sql pyspark

8
推荐指数
1
解决办法
2万
查看次数

Pyspark Dataframe One-Hot编码

我正在使用分类数据在Spark DataFrame上进行数据准备.我需要对分类数据进行One-Hot-Encoding,我在spark 1.6上尝试了这个

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
Run Code Online (Sandbox Code Playgroud)

这段代码产生了这种格式的单热编码数据.

+---+-------------+
| id|  categoryVec|
+---+-------------+
|  0|(3,[0],[1.0])|
|  1|(3,[2],[1.0])|
|  2|(3,[1],[1.0])|
|  3|(3,[0],[1.0])|
|  4|(3,[0],[1.0])|
|  5|(3,[1],[1.0])|
+---+-------------+
Run Code Online (Sandbox Code Playgroud)

通常,我对One-Hot编码技术的期望是每个类别的每列和0,1个相应的值.如何从中获取这类数据?

apache-spark apache-spark-sql pyspark apache-spark-mllib one-hot-encoding

6
推荐指数
0
解决办法
1845
查看次数

RuntimeError: Unsupported type in conversion to Arrow: VectorUDT

我想将一个大的 spark 数据框转换为超过 1000000 行的 Pandas。我尝试使用以下代码将 spark 数据帧转换为 Pandas 数据帧:

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
result.toPandas()
Run Code Online (Sandbox Code Playgroud)

但是,我得到了错误:

TypeError                                 Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/sql/dataframe.py in toPandas(self)
   1949                 import pyarrow
-> 1950                 to_arrow_schema(self.schema)
   1951                 tables = self._collectAsArrow()

/usr/local/lib/python3.6/dist-packages/pyspark/sql/types.py in to_arrow_schema(schema)
   1650     fields = [pa.field(field.name, to_arrow_type(field.dataType), nullable=field.nullable)
-> 1651               for field in schema]
   1652     return pa.schema(fields)

/usr/local/lib/python3.6/dist-packages/pyspark/sql/types.py in <listcomp>(.0)
   1650     fields = [pa.field(field.name, to_arrow_type(field.dataType), nullable=field.nullable)
-> 1651               for field in schema]
   1652     return pa.schema(fields)

/usr/local/lib/python3.6/dist-packages/pyspark/sql/types.py in to_arrow_type(dt)
   1641     else:
-> 1642         raise TypeError("Unsupported type in …
Run Code Online (Sandbox Code Playgroud)

dataframe pandas apache-spark pyspark pyarrow

6
推荐指数
1
解决办法
4822
查看次数

Spark Scala:如何将Dataframe [vector]转换为DataFrame [f1:Double,...,fn:Double]]

我只是使用标准缩放器来规范我的ML应用程序的功能.选择缩放功能后,我想将其转换回双打数据帧,尽管我的矢量长度是任意的.我知道如何使用特定的3个功能

myDF.map{case Row(v: Vector) => (v(0), v(1), v(2))}.toDF("f1", "f2", "f3")
Run Code Online (Sandbox Code Playgroud)

但不是任意数量的功能.是否有捷径可寻?

例:

val testDF = sc.parallelize(List(Vectors.dense(5D, 6D, 7D), Vectors.dense(8D, 9D, 10D), Vectors.dense(11D, 12D, 13D))).map(Tuple1(_)).toDF("scaledFeatures")
val myColumnNames = List("f1", "f2", "f3")
// val finalDF = DataFrame[f1: Double, f2: Double, f3: Double] 
Run Code Online (Sandbox Code Playgroud)

编辑

我在创建数据帧时发现了如何解压缩到列名,但是仍然无法将向量转换为创建数据帧所需的序列:

finalDF = testDF.map{case Row(v: Vector) => v.toArray.toSeq /* <= this errors */}.toDF(List("f1", "f2", "f3"): _*)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-ml

4
推荐指数
1
解决办法
9698
查看次数

将 PySpark DenseVector 转换为数组

我正在尝试将 DenseVector 的 pyspark 数据帧列转换为数组,但我总是遇到错误。

data = [(Vectors.dense([8.0, 1.0, 3.0, 2.0, 5.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]

df = spark.createDataFrame(data,["features"])
Run Code Online (Sandbox Code Playgroud)

我试图定义一个 UDF 并使用 toArray()

to_array = udf(lambda x: x.toArray(), ArrayType(FloatType()))
df = df.withColumn('features', to_array('features'))
Run Code Online (Sandbox Code Playgroud)

但是,如果我执行 df.collect(),我会收到以下错误

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 17.0 failed 4 times, 
most recent failure: Lost task 1.3 in stage 17.0 (TID 100, 10.139.64.6, executor 0): 
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict 
(for numpy.core.multiarray._reconstruct) …
Run Code Online (Sandbox Code Playgroud)

python pyspark

4
推荐指数
1
解决办法
5414
查看次数

How to standardize ONE column in Spark using StandardScaler?

I am trying to standardize (mean = 0, std = 1) one column ('age') in my data frame. Below is my code in Spark (Python):

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Make my 'age' column an assembler type:
age_assembler = VectorAssembler(inputCols= ['age'], outputCol = "age_feature")

# Create a scaler that takes 'age_feature' as an input column:
scaler = StandardScaler(inputCol="age_feature", outputCol="age_scaled",
                        withStd=True, withMean=True)

# Creating a mini-pipeline for those 2 steps:
age_pipeline = Pipeline(stages=[age_assembler, …
Run Code Online (Sandbox Code Playgroud)

python scale apache-spark pyspark

3
推荐指数
1
解决办法
3248
查看次数