在Spark中组合Row()

DAE*_*DAE 0 apache-spark-sql pyspark

看似简单的问题,却找不到答案.

问题:我创建了一个函数,我将传递给map(),它接受一个字段并从中创建三个字段.我希望map()的输出给我一个新的RDD,包括输入RDD和新/输出RDD的字段.我该怎么做呢?

我是否需要将我的数据键添加到函数的输出中,以便我可以将更多输出RDD加入到我原来的RDD中?这是正确的/最佳做法吗?

def extract_fund_code_from_iv_id(holding):
    # Must include key of data for later joining
    iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
    return iv_id
Run Code Online (Sandbox Code Playgroud)

更基本的,我似乎无法结合两个Row.

row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
Run Code Online (Sandbox Code Playgroud)

这不会像我想要的那样返回一个新的Row().

谢谢

kar*_*son 7

我真的建议使用UserDefinedFunction.

假设您想从DataFrame int_col类型int的列中提取许多功能df.假设这些特征是简单的modulo 3modulo 2所述列内容.

我们将导入UserDefinedFunction我们的函数的数据类型.

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
Run Code Online (Sandbox Code Playgroud)

然后我们将实现我们的特征提取功能:

def modulo_three(col):
    return int(col) % 3

def modulo_two(col):
    return int(col) % 2
Run Code Online (Sandbox Code Playgroud)

把它们变成udfs:

mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())
Run Code Online (Sandbox Code Playgroud)

现在我们将计算所有其他列并为它们提供好的名称(via alias):

new_columns = [
    mod3(df['int_col']).alias('mod3'),
    mod2(df['int_col']).alias('mod2'),
]
Run Code Online (Sandbox Code Playgroud)

最后,我们选择这些列以及之前已存在的所有列:

new_df = df.select(*df.columns+new_columns)
Run Code Online (Sandbox Code Playgroud)

new_df现在将有两个额外的列mod3mod2.