Art*_*cca 54 python apache-spark apache-spark-sql pyspark apache-spark-ml
请考虑以下代码段(假设spark
已设置为某些代码段SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Run Code Online (Sandbox Code Playgroud)
请注意,temperature字段是浮动列表.我想将这些浮点数列表转换为MLlib类型Vector
,我希望使用基本DataFrame
API 表示这种转换,而不是通过RDD表达(这是低效的,因为它将所有数据从JVM发送到Python,处理在Python中完成,我们没有得到Spark的Catalyst优化器,yada yada的好处.我该怎么做呢?特别:
这就是我期望的"正确"解决方案.我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换.作为一个上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Run Code Online (Sandbox Code Playgroud)
现在例如df_with_strings.collect()[0]["temperatures"][1]
是'-7.0'
.但是如果我施放到ml Vector那么事情就不那么顺利了:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Run Code Online (Sandbox Code Playgroud)
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
Run Code Online (Sandbox Code Playgroud)
哎呀!任何想法如何解决这一问题?
VectorAssembler
有一个Transformer
看起来几乎是这项工作的理想选择:VectorAssembler
.它需要一列或多列并将它们连接成一个向量.不幸的是,它只需要Vector
和Float
列,而不是Array
列,所以后续不起作用:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
Run Code Online (Sandbox Code Playgroud)
它给出了这个错误:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
Run Code Online (Sandbox Code Playgroud)
我能想到的最好的工作是将列表分解为多个列,然后使用它VectorAssembler
来再次收集它们:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
Run Code Online (Sandbox Code Playgroud)
这似乎是理想的,除了TEMPERATURE_COUNT
超过100,有时超过1000.(另一个问题是,如果你事先不知道数组的大小,代码会更复杂,虽然这不是我的数据的情况.)Spark实际上是生成一个包含那么多列的中间数据集,还是只是认为这是一个单个项目瞬间通过的中间步骤(或者当它看到它时它完全优化了这个步骤)只使用这些列组装成矢量)?
一个相当简单的替代方法是使用UDF进行转换.这让我可以在一行代码中直接表达我想要做的事情,并且不需要使用疯狂的列数来创建数据集.但是所有这些数据都必须在Python和JVM之间进行交换,并且每个单独的数字都必须由Python处理(这对于迭代单个数据项来说是非常慢的).这是看起来如何:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
Run Code Online (Sandbox Code Playgroud)
这个漫无边际问题的其余部分是我在试图找到答案时想出的一些额外的事情.大多数读这篇文章的人可能会跳过它们.
Vector
开始使用在这个简单的例子中,可以使用矢量类型开始创建数据,但当然我的数据实际上不是我正在并行化的Python列表,而是从数据源读取.但是为了记录,这是看起来如何:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
Run Code Online (Sandbox Code Playgroud)
map()
一种可能性是使用RDD map()
方法将列表转换为a Vector
.这类似于UDF的想法,除了它更糟糕,因为序列化等的成本是由每行中的所有字段引起的,而不仅仅是正在操作的字段.为了记录,这是解决方案的样子:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
Run Code Online (Sandbox Code Playgroud)
无奈之下,我注意到它Vector
由一个带有四个字段的结构体内部表示,但是使用来自该类型结构的传统强制转换也不起作用.这是一个例子(我使用udf构建结构但是udf不是重要的部分):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
Run Code Online (Sandbox Code Playgroud)
这给出了错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
Run Code Online (Sandbox Code Playgroud)
use*_*411 18
就个人而言,我会使用Python UDF并且不会为其他任何事情烦恼:
Vectors
不是本机SQL类型,因此会有这样或那样的性能开销.特别地,该过程需要两个步骤,其中数据首先从外部类型转换为行,然后使用泛型从行转换为内部表示RowEncoder
.Pipeline
都比简单转换昂贵得多.而且,它需要与上述方法相反的方法但如果你真的想要其他选择,你可以:
带有Python包装器的Scala UDF:
按照项目站点上的说明安装sbt.
使用以下结构创建Scala包:
.
??? build.sbt
??? udfs.scala
Run Code Online (Sandbox Code Playgroud)
编辑build.sbt
(调整以反映Scala和Spark版本):
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.1.0",
"org.apache.spark" %% "spark-mllib" % "2.1.0"
)
Run Code Online (Sandbox Code Playgroud)
编辑udfs.scala
:
package com.example.spark.udfs
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.linalg.DenseVector
object udfs {
val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
}
Run Code Online (Sandbox Code Playgroud)
包:
sbt package
Run Code Online (Sandbox Code Playgroud)
并包括(或等同于Scala vers:
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)
作为--driver-class-path
启动shell /提交应用程序时的参数.
在PySpark中定义一个包装器:
from pyspark.sql.column import _to_java_column, _to_seq, Column
from pyspark import SparkContext
def as_vector(col):
sc = SparkContext.getOrCreate()
f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
测试:
with_vec = df.withColumn("vector", as_vector("temperatures"))
with_vec.show()
Run Code Online (Sandbox Code Playgroud)
+--------+------------------+----------------+
| city| temperatures| vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
with_vec.printSchema()
Run Code Online (Sandbox Code Playgroud)
root
|-- city: string (nullable = true)
|-- temperatures: array (nullable = true)
| |-- element: double (containsNull = true)
|-- vector: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)将数据转储为反映DenseVector
模式的JSON格式并将其读回:
from pyspark.sql.functions import to_json, from_json, col, struct, lit
from pyspark.sql.types import StructType, StructField
from pyspark.ml.linalg import VectorUDT
json_vec = to_json(struct(struct(
lit(1).alias("type"), # type 1 is dense, type 0 is sparse
col("temperatures").alias("values")
).alias("v")))
schema = StructType([StructField("v", VectorUDT())])
with_parsed_vector = df.withColumn(
"parsed_vector", from_json(json_vec, schema).getItem("v")
)
with_parsed_vector.show()
Run Code Online (Sandbox Code Playgroud)
+--------+------------------+----------------+
| city| temperatures| parsed_vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
Run Code Online (Sandbox Code Playgroud)
with_parsed_vector.printSchema()
Run Code Online (Sandbox Code Playgroud)
root
|-- city: string (nullable = true)
|-- temperatures: array (nullable = true)
| |-- element: double (containsNull = true)
|-- parsed_vector: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)我和你有同样的问题,我就是这样做的。这种方式包括 RDD 转换,因此对性能不是很重要,但它确实有效。
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])
new_df
Run Code Online (Sandbox Code Playgroud)
结果是,
DataFrame[city: string, temperatures: vector]
Run Code Online (Sandbox Code Playgroud)
对于 pyspark>=3.1.0
从 3.1.0 开始,有一个内置解决方案:array_to_vector。
鉴于您的情况:
from pyspark.ml.functions import vector_to_array
df = df.withColumn("temperatures_vectorized", vector_to_array("temperatures"))
Run Code Online (Sandbox Code Playgroud)
PS:从 3.0.0 开始,还有一个相反的操作:vector_to_array
归档时间: |
|
查看次数: |
21952 次 |
最近记录: |