在开始使用pyspark.ml
管道 API 时,我发现自己为典型的预处理任务编写了自定义转换器,以便在管道中使用它们。例子:
from pyspark.ml import Pipeline, Transformer
class CustomTransformer(Transformer):
# lazy workaround - a transformer needs to have these attributes
_defaultParamMap = dict()
_paramMap = dict()
_params = dict()
class ColumnSelector(CustomTransformer):
"""Transformer that selects a subset of columns
- to be used as pipeline stage"""
def __init__(self, columns):
self.columns = columns
def _transform(self, data):
return data.select(self.columns)
class ColumnRenamer(CustomTransformer):
"""Transformer renames one column"""
def __init__(self, rename):
self.rename = rename
def _transform(self, data):
(colNameBefore, colNameAfter) = self.rename
return data.withColumnRenamed(colNameBefore, …
Run Code Online (Sandbox Code Playgroud)