小编ani*_*711的帖子

Scikit-learn 管道类型错误:zip 参数 #2 必须支持迭代

我正在尝试为 sklearn 管道创建一个自定义转换器,它将提取特定文本的平均字长,然后对其应用标准缩放器以标准化数据集。我正在将一系列文本传递给管道。

class AverageWordLengthExtractor(BaseEstimator, TransformerMixin):

    def __init__(self):
        pass
    def average_word_length(self, text):
        return np.mean([len(word) for word in text.split( )])
    def fit(self, x, y=None):
        return self
    def transform(self, x , y=None):
        return pd.DataFrame(pd.Series(x).apply(self.average_word_length))
Run Code Online (Sandbox Code Playgroud)

然后我创建了一个这样的管道。

pipeline = Pipeline(['text_length', AverageWordLengthExtractor(), 
                         'scale', StandardScaler()])
Run Code Online (Sandbox Code Playgroud)

当我在这条管道上执行 fit_transform 时,我收到错误消息,

 File "custom_transformer.py", line 48, in <module>
    main()
  File "custom_transformer.py", line 43, in main
    'scale', StandardScaler()])
  File "/opt/conda/lib/python3.6/site-packages/sklearn/pipeline.py", line 114, in __init__
    self._validate_steps()
  File "/opt/conda/lib/python3.6/site-packages/sklearn/pipeline.py", line 146, in _validate_steps
    names, estimators = zip(*self.steps)
TypeError: zip argument #2 must support …
Run Code Online (Sandbox Code Playgroud)

python pipeline python-3.x scikit-learn

7
推荐指数
1
解决办法
2876
查看次数

为什么我的shuffle分区在group by操作时不是200(默认)?(火花2.4.5)

我对 Spark 很陌生,并试图了解它的内部结构。因此,我从 s3 读取一个 50MB 的小 parquet 文件并执行分组,然后保存回 s3。当我观察 Spark UI 时,我可以看到为此创建了 3 个阶段,

阶段 0:加载(1 个任务)

第 1 阶段:用于分组的 shufflequerystage(12 个任务)

第 2 阶段:保存(coalescedshufflereader)(26 个任务)

代码示例:

df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\                             
 .agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)
Run Code Online (Sandbox Code Playgroud)

我使用带有 1 个主节点、3 个核心节点(每个节点有 4 个 vcore)的 EMR 实例。因此,默认并行度为 12。我不会在运行时更改任何配置。但我不明白为什么最后阶段会创建26个任务?据我了解,默认情况下,随机分区应为 200。附加 UI 的屏幕截图。

在此输入图像描述

amazon-emr apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
3655
查看次数