DAE*_*DAE 0 apache-spark-sql pyspark
看似简单的问题,却找不到答案.
问题:我创建了一个函数,我将传递给map(),它接受一个字段并从中创建三个字段.我希望map()的输出给我一个新的RDD,包括输入RDD和新/输出RDD的字段.我该怎么做呢?
我是否需要将我的数据键添加到函数的输出中,以便我可以将更多输出RDD加入到我原来的RDD中?这是正确的/最佳做法吗?
def extract_fund_code_from_iv_id(holding):
# Must include key of data for later joining
iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
return iv_id
Run Code Online (Sandbox Code Playgroud)
更基本的,我似乎无法结合两个Row.
row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
Run Code Online (Sandbox Code Playgroud)
这不会像我想要的那样返回一个新的Row().
谢谢
我真的建议使用UserDefinedFunction.
假设您想从DataFrame int_col类型int的列中提取许多功能df.假设这些特征是简单的modulo 3和modulo 2所述列内容.
我们将导入UserDefinedFunction我们的函数的数据类型.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
Run Code Online (Sandbox Code Playgroud)
然后我们将实现我们的特征提取功能:
def modulo_three(col):
return int(col) % 3
def modulo_two(col):
return int(col) % 2
Run Code Online (Sandbox Code Playgroud)
把它们变成udfs:
mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())
Run Code Online (Sandbox Code Playgroud)
现在我们将计算所有其他列并为它们提供好的名称(via alias):
new_columns = [
mod3(df['int_col']).alias('mod3'),
mod2(df['int_col']).alias('mod2'),
]
Run Code Online (Sandbox Code Playgroud)
最后,我们选择这些列以及之前已存在的所有列:
new_df = df.select(*df.columns+new_columns)
Run Code Online (Sandbox Code Playgroud)
new_df现在将有两个额外的列mod3和mod2.
| 归档时间: |
|
| 查看次数: |
2694 次 |
| 最近记录: |