在 PySpark 中使用 toDF() 函数从 RDD 转换为 Dataframe 时的奇怪行为

xki*_*ing 6 python apache-spark rdd apache-spark-sql pyspark

我是 Spark 的新手。当我使用 toDF() 函数将 RDD 转换为数据帧时,它似乎计算了我之前编写的 map() 之类的所有转换函数。我想知道 PySpark 中的 toDF() 是转换还是动作。

我创建了一个简单的 RDD 并使用一个简单的函数来输出它的值,仅用于测试,并在 map() 之后使用 toDF()。结果似乎部分地运行了 map 中的函数。当我显示数据帧的结果时, toDF() 就像转换并再次输出结果。

>>> a = sc.parallelize([(1,),(2,),(3,)])
>>> def f(x):
...     print(x[0])
...     return (x[0] + 1, )
...
>>> b = a.map(f).toDF(["id"])
2
1
>>> b = a.map(f).toDF(["id"]).show()
2
1
1
2
3
+---+
| id|
+---+
|  2|
|  3|
|  4|
+---+
Run Code Online (Sandbox Code Playgroud)

有人能告诉我为什么 PySpark 中的 toDF() 函数既像动作又像转换吗?非常感谢。

PS:在 Scala 中,toDF 在我的情况下就像转换一样。

104*_*ica 5

这并不奇怪。由于您没有提供架构,Spark 必须根据数据推断它。如果 是RDD输入,它将SparkSession._createFromRDD随后调用SparkSession._inferSchema,如果samplingRatio缺少,将计算最多 100 行

first = rdd.first()
if not first:
    raise ValueError("The first row in RDD is empty, "
                     "can not infer schema")
if type(first) is dict:
    warnings.warn("Using RDD of dict to inferSchema is deprecated. "
                  "Use pyspark.sql.Row instead")


if samplingRatio is None:
    schema = _infer_schema(first, names=names)
    if _has_nulltype(schema):
        for row in rdd.take(100)[1:]:
            schema = _merge_type(schema, _infer_schema(row, names=names))
            if not _has_nulltype(schema):
                break
        else:
            raise ValueError("Some of types cannot be determined by the "
                             "first 100 rows, please try again with sampling")
Run Code Online (Sandbox Code Playgroud)

现在剩下的唯一的难题是为什么它不准确地评估一条记录。毕竟你的情况first不为空且不包含None.

这是因为它first是通过实施的take,并且不能保证评估的项目的确切数量。如果第一个分区没有产生所需数量的项目,它将迭代地增加要扫描的分区数量。请检查执行情况详情

如果您想避免这种情况,您应该使用createDataFrame并提供模式作为 DDL 字符串:

spark.createDataFrame(a.map(f), "val: integer")
Run Code Online (Sandbox Code Playgroud)

或同等学历StructType

您不会在 Scala 对应项中找到任何类似的行为,因为它不使用toDF. 它要么从(使用 Scala 反射获取)检索相应的模式Encoder,要么根本不允许转换。最接近的类似行为是对 CSV或 JSON等输入源的推断:

first = rdd.first()
if not first:
    raise ValueError("The first row in RDD is empty, "
                     "can not infer schema")
if type(first) is dict:
    warnings.warn("Using RDD of dict to inferSchema is deprecated. "
                  "Use pyspark.sql.Row instead")


if samplingRatio is None:
    schema = _infer_schema(first, names=names)
    if _has_nulltype(schema):
        for row in rdd.take(100)[1:]:
            schema = _merge_type(schema, _infer_schema(row, names=names))
            if not _has_nulltype(schema):
                break
        else:
            raise ValueError("Some of types cannot be determined by the "
                             "first 100 rows, please try again with sampling")
Run Code Online (Sandbox Code Playgroud)