如何在PySpark中存储?

cei*_*cat 17 pyspark

例如,我想DataFrame根据年龄将一个人分为以下4个箱子.

age_bins = [0, 6, 18, 60, np.Inf]
age_labels = ['infant', 'minor', 'adult', 'senior']
Run Code Online (Sandbox Code Playgroud)

我会pandas.cut()用来做这个pandas.我该怎么办PySpark

ash*_*ids 29

您可以在spark中使用来自ml库的Bucketizer功能transfrom.

values = [("a", 23), ("b", 45), ("c", 10), ("d", 60), ("e", 56), ("f", 2), ("g", 25), ("h", 40), ("j", 33)]


df = spark.createDataFrame(values, ["name", "ages"])


from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float('Inf') ],inputCol="ages", outputCol="buckets")
df_buck = bucketizer.setHandleInvalid("keep").transform(df)

df_buck.show()
Run Code Online (Sandbox Code Playgroud)

产量

+----+----+-------+
|name|ages|buckets|
+----+----+-------+
|   a|  23|    2.0|
|   b|  45|    2.0|
|   c|  10|    1.0|
|   d|  60|    3.0|
|   e|  56|    2.0|
|   f|   2|    0.0|
|   g|  25|    2.0|
|   h|  40|    2.0|
|   j|  33|    2.0|
+----+----+-------+
Run Code Online (Sandbox Code Playgroud)

如果需要每个存储桶的名称,可以使用udf创建带存储桶名称的新列

from pyspark.sql.functions import udf
from pyspark.sql.types import *

t = {0.0:"infant", 1.0: "minor", 2.0:"adult", 3.0: "senior"}
udf_foo = udf(lambda x: t[x], StringType())
df_buck.withColumn("age_bucket", udf_foo("buckets")).show()
Run Code Online (Sandbox Code Playgroud)

产量

+----+----+-------+----------+
|name|ages|buckets|age_bucket|
+----+----+-------+----------+
|   a|  23|    2.0|     adult|
|   b|  45|    2.0|     adult|
|   c|  10|    1.0|     minor|
|   d|  60|    3.0|    senior|
|   e|  56|    2.0|     adult|
|   f|   2|    0.0|    infant|
|   g|  25|    2.0|     adult|
|   h|  40|    2.0|     adult|
|   j|  33|    2.0|     adult|
+----+----+-------+----------+
Run Code Online (Sandbox Code Playgroud)


inf*_*nge 6

您还可以编写 PySpark UDF:

def categorizer(age):
  if age < 6:
    return "infant"
  elif age < 18:
    return "minor"
  elif age < 60:
    return "adult"
  else: 
    return "senior"
Run Code Online (Sandbox Code Playgroud)

然后:

bucket_udf = udf(categorizer, StringType() )
bucketed = df.withColumn("bucket", bucket_udf("age"))
Run Code Online (Sandbox Code Playgroud)