从pyspark中的数据框构建StructType

lea*_*ing 12 python dataframe apache-spark apache-spark-sql pyspark

我是新的spark和python,并且面临从可以应用于我的数据文件的元数据文件构建模式的困难.场景:数据文件的元数据文件(csv格式),包含列及其类型:例如:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0
Run Code Online (Sandbox Code Playgroud)

我已成功将其转换为数据框,如下所示:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|
Run Code Online (Sandbox Code Playgroud)

但是当我尝试使用它将其转换为StructField格式时

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))
Run Code Online (Sandbox Code Playgroud)

要么

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()
Run Code Online (Sandbox Code Playgroud)

然后使用将其转换为StructType

schemaFinal = StructType(schemaList)
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType
Run Code Online (Sandbox Code Playgroud)

由于我对数据框架缺乏了解,我对此感到困惑,请您告知,如何继续这样做.一旦我准备好架构,我想使用createDataFrame来应用我的数据文件.必须对许多表执行此过程,因此我不想对类型进行硬编码,而是使用元数据文件来构建模式,然后应用于RDD.

提前致谢.

zer*_*323 22

字段有参数必须是DataType对象列表.这个:

.map(lambda l:([StructField(l.name, l.type, 'true')]))
Run Code Online (Sandbox Code Playgroud)

后,生成collect 一个listliststuples(Rows的)DataType(list[list[tuple[DataType]]])更不用说nullable参数应该是布尔值不是字符串.

你的第二次尝试:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).
Run Code Online (Sandbox Code Playgroud)

后,生成collect一个liststr对象.

您显示的记录的正确架构应该看起来或多或少如下:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])
Run Code Online (Sandbox Code Playgroud)

尽管对这样的任务使用分布式数据结构是一种严重的过度杀伤,更不用说低效,但您可以尝试按如下方式调整第一个解决方案:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])
Run Code Online (Sandbox Code Playgroud)

但它并不是特别安全(eval).从JSON /字典构建模式可能更容易.假设您有从类型描述映射到规范类型名称的函数:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())
Run Code Online (Sandbox Code Playgroud)

您可以构建以下形状的字典:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}
Run Code Online (Sandbox Code Playgroud)

并将其喂给StructType.fromJson:

StructType.fromJson(schema_dict)
Run Code Online (Sandbox Code Playgroud)