我正在尝试为 sklearn 管道创建一个自定义转换器,它将提取特定文本的平均字长,然后对其应用标准缩放器以标准化数据集。我正在将一系列文本传递给管道。
class AverageWordLengthExtractor(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def average_word_length(self, text):
return np.mean([len(word) for word in text.split( )])
def fit(self, x, y=None):
return self
def transform(self, x , y=None):
return pd.DataFrame(pd.Series(x).apply(self.average_word_length))
Run Code Online (Sandbox Code Playgroud)
然后我创建了一个这样的管道。
pipeline = Pipeline(['text_length', AverageWordLengthExtractor(),
'scale', StandardScaler()])
Run Code Online (Sandbox Code Playgroud)
当我在这条管道上执行 fit_transform 时,我收到错误消息,
File "custom_transformer.py", line 48, in <module>
main()
File "custom_transformer.py", line 43, in main
'scale', StandardScaler()])
File "/opt/conda/lib/python3.6/site-packages/sklearn/pipeline.py", line 114, in __init__
self._validate_steps()
File "/opt/conda/lib/python3.6/site-packages/sklearn/pipeline.py", line 146, in _validate_steps
names, estimators = zip(*self.steps)
TypeError: zip argument #2 must support …Run Code Online (Sandbox Code Playgroud) 我对 Spark 很陌生,并试图了解它的内部结构。因此,我从 s3 读取一个 50MB 的小 parquet 文件并执行分组,然后保存回 s3。当我观察 Spark UI 时,我可以看到为此创建了 3 个阶段,
阶段 0:加载(1 个任务)
第 1 阶段:用于分组的 shufflequerystage(12 个任务)
第 2 阶段:保存(coalescedshufflereader)(26 个任务)
代码示例:
df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\
.agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)
Run Code Online (Sandbox Code Playgroud)
我使用带有 1 个主节点、3 个核心节点(每个节点有 4 个 vcore)的 EMR 实例。因此,默认并行度为 12。我不会在运行时更改任何配置。但我不明白为什么最后阶段会创建26个任务?据我了解,默认情况下,随机分区应为 200。附加 UI 的屏幕截图。