我有一个数据帧df有VectorUDT指定的列features.如何获取列的元素,比如第一个元素?
我尝试过以下操作
from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()
Run Code Online (Sandbox Code Playgroud)
但是我收到了一个net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)错误.如果我first_elem_udf = first_elem_udf(lambda row: row.toArray()[0])改为相同的错误.
我也试过,explode()但我得到一个错误,因为它需要一个数组或地图类型.
我认为这应该是一种常见的操作.
dataframe apache-spark apache-spark-sql pyspark apache-spark-ml
有没有办法从pyspark中的Spark ML获取ROC曲线上的点?在文档中,我看到了一个 Scala 的例子,但不是 python:https : //spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html
那正确吗?我当然可以想出实现它的方法,但我不得不想象如果有一个预先构建的函数它会更快。我正在处理 300 万个分数和几十个模型,所以速度很重要。
我想将MinMaxScalarPySpark 应用于 PySpark 数据框的多列df。到目前为止,我只知道如何将它应用于单个列,例如x.
from pyspark.ml.feature import MinMaxScaler
pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)
scaler = MinMaxScaler(inputCol="x", outputCol="x")
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)
Run Code Online (Sandbox Code Playgroud)
如果我有 100 列怎么办?有没有办法对 PySpark 中的许多列进行最小-最大缩放?
更新:
另外,如何应用MinMaxScalar整数或双精度值?它引发以下错误:
java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int.
Run Code Online (Sandbox Code Playgroud) Spark 2.2.0添加了对数据帧的关联支持。有关这方面的更多信息可以在拉取请求中找到。
MLlib 基于 DataFrame 的 API 中的新算法:
SPARK-19636:基于 DataFrame 的 API 中的关联 (Scala/Java/Python)
然而,完全不清楚如何使用此更改或与以前的版本相比发生了什么变化。
我期待这样的事情:
df_num = spark.read.parquet('/dataframe')
df_cat.printSchema()
df_cat.show()
df_num.corr(col1='features', col2='fail_mode_meas')
Run Code Online (Sandbox Code Playgroud)
df_num = spark.read.parquet('/dataframe')
df_cat.printSchema()
df_cat.show()
df_num.corr(col1='features', col2='fail_mode_meas')
Run Code Online (Sandbox Code Playgroud)
有人可以解释如何利用 Spark 2.2.0 的新功能来实现数据帧中的关联吗?
我正在使用分类数据在Spark DataFrame上进行数据准备.我需要对分类数据进行One-Hot-Encoding,我在spark 1.6上尝试了这个
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()
Run Code Online (Sandbox Code Playgroud)
这段代码产生了这种格式的单热编码数据.
+---+-------------+
| id| categoryVec|
+---+-------------+
| 0|(3,[0],[1.0])|
| 1|(3,[2],[1.0])|
| 2|(3,[1],[1.0])|
| 3|(3,[0],[1.0])|
| 4|(3,[0],[1.0])|
| 5|(3,[1],[1.0])|
+---+-------------+
Run Code Online (Sandbox Code Playgroud)
通常,我对One-Hot编码技术的期望是每个类别的每列和0,1个相应的值.如何从中获取这类数据?
apache-spark apache-spark-sql pyspark apache-spark-mllib one-hot-encoding
我想将一个大的 spark 数据框转换为超过 1000000 行的 Pandas。我尝试使用以下代码将 spark 数据帧转换为 Pandas 数据帧:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
result.toPandas()
Run Code Online (Sandbox Code Playgroud)
但是,我得到了错误:
TypeError Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/sql/dataframe.py in toPandas(self)
1949 import pyarrow
-> 1950 to_arrow_schema(self.schema)
1951 tables = self._collectAsArrow()
/usr/local/lib/python3.6/dist-packages/pyspark/sql/types.py in to_arrow_schema(schema)
1650 fields = [pa.field(field.name, to_arrow_type(field.dataType), nullable=field.nullable)
-> 1651 for field in schema]
1652 return pa.schema(fields)
/usr/local/lib/python3.6/dist-packages/pyspark/sql/types.py in <listcomp>(.0)
1650 fields = [pa.field(field.name, to_arrow_type(field.dataType), nullable=field.nullable)
-> 1651 for field in schema]
1652 return pa.schema(fields)
/usr/local/lib/python3.6/dist-packages/pyspark/sql/types.py in to_arrow_type(dt)
1641 else:
-> 1642 raise TypeError("Unsupported type in …Run Code Online (Sandbox Code Playgroud) 我只是使用标准缩放器来规范我的ML应用程序的功能.选择缩放功能后,我想将其转换回双打数据帧,尽管我的矢量长度是任意的.我知道如何使用特定的3个功能
myDF.map{case Row(v: Vector) => (v(0), v(1), v(2))}.toDF("f1", "f2", "f3")
Run Code Online (Sandbox Code Playgroud)
但不是任意数量的功能.是否有捷径可寻?
例:
val testDF = sc.parallelize(List(Vectors.dense(5D, 6D, 7D), Vectors.dense(8D, 9D, 10D), Vectors.dense(11D, 12D, 13D))).map(Tuple1(_)).toDF("scaledFeatures")
val myColumnNames = List("f1", "f2", "f3")
// val finalDF = DataFrame[f1: Double, f2: Double, f3: Double]
Run Code Online (Sandbox Code Playgroud)
编辑
我在创建数据帧时发现了如何解压缩到列名,但是仍然无法将向量转换为创建数据帧所需的序列:
finalDF = testDF.map{case Row(v: Vector) => v.toArray.toSeq /* <= this errors */}.toDF(List("f1", "f2", "f3"): _*)
Run Code Online (Sandbox Code Playgroud) 我正在尝试将 DenseVector 的 pyspark 数据帧列转换为数组,但我总是遇到错误。
data = [(Vectors.dense([8.0, 1.0, 3.0, 2.0, 5.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data,["features"])
Run Code Online (Sandbox Code Playgroud)
我试图定义一个 UDF 并使用 toArray()
to_array = udf(lambda x: x.toArray(), ArrayType(FloatType()))
df = df.withColumn('features', to_array('features'))
Run Code Online (Sandbox Code Playgroud)
但是,如果我执行 df.collect(),我会收到以下错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 17.0 failed 4 times,
most recent failure: Lost task 1.3 in stage 17.0 (TID 100, 10.139.64.6, executor 0):
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict
(for numpy.core.multiarray._reconstruct) …Run Code Online (Sandbox Code Playgroud) I am trying to standardize (mean = 0, std = 1) one column ('age') in my data frame. Below is my code in Spark (Python):
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# Make my 'age' column an assembler type:
age_assembler = VectorAssembler(inputCols= ['age'], outputCol = "age_feature")
# Create a scaler that takes 'age_feature' as an input column:
scaler = StandardScaler(inputCol="age_feature", outputCol="age_scaled",
withStd=True, withMean=True)
# Creating a mini-pipeline for those 2 steps:
age_pipeline = Pipeline(stages=[age_assembler, …Run Code Online (Sandbox Code Playgroud) pyspark ×8
apache-spark ×6
python ×4
dataframe ×2
correlation ×1
pandas ×1
pyarrow ×1
scala ×1
scale ×1