相关疑难解决方法(0)

如何在PySpark mllib中滚动自定义估算器

我想Estimator在PySpark MLlib中构建一个简单的自定义.我在这里可以编写一个自定义的Transformer,但我不知道如何在一个Estimator.我也不明白是什么@keyword_only以及为什么我需要这么多的二传手和吸气剂.Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有.

示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark apache-spark-ml apache-spark-mllib

13
推荐指数
1
解决办法
4313
查看次数

使用python序列化自定义转换器以在Pyspark ML管道中使用

在PySpark ML创建自定义Transformer的评论部分找到了相同的讨论,但没有明确的答案.还有一个未解决的JIRA对应于:https://issues.apache.org/jira/browse/SPARK-17025.

鉴于Pyspark ML管道没有提供用于保存用python编写的自定义转换器的选项,有什么其他选项可以完成它?如何在我的python类中实现返回兼容java对象的_to_java方法?

pyspark apache-spark-ml

10
推荐指数
2
解决办法
3855
查看次数

PySpark Pipeline中的自定义变换器,带有交叉验证

我写了一个自定义变换器,就像这里描述的那样.

当我的变压器作为第一步创建管道时,我能够训练(Logistic回归)模型进行分类.

但是,当我想用​​这个管道执行交叉验证时,如下所示:

from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

sentenceDataFrame = sqlContext.createDataFrame([
  (1.0, "Hi I heard about Spark"),
  (1.0, "Spark is awesome"),
  (0.0, "But there seems to be a problem"),
  (0.0, "And I don't know why...")
], ["label", "sentence"])

tokenizer = NLTKWordPunctTokenizer(
    inputCol="sentence", outputCol="words",  
    stopwords=set(nltk.corpus.stopwords.words('english')))

hasher = HashingTF(inputCol="words",outputCol="features")
lr = LogisticRegression()

pipeline = Pipeline(stages=[tokenizer,hasher,lr])

paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
                              .addGrid(lr.tol, (1e-5, 1e-6))\
                              .build() …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark apache-spark-1.4 apache-spark-ml

6
推荐指数
0
解决办法
1865
查看次数

如何在pyspark中保存数据帧转换过程的部分结果?

我正在apache-spark使用python在单个Dataframe上进行多次转换.

我编写了一些函数,以便更容易进行不同的转换.想象一下,我们有以下功能:

clearAccents(df,columns)
#lines that remove accents from dataframe with spark functions or 
#udf
    return df
Run Code Online (Sandbox Code Playgroud)

我使用这些函数"覆盖"dataframe变量,以保存每次函数返回时转换的新数据帧.我知道这不是一个好习惯,现在我看到了后果.

我注意到每次添加如下所示的行时,运行时间会更长:

# Step transformation 1:
df = function1(df,column)
# Step transformation 2.
df = function2(df, column)
Run Code Online (Sandbox Code Playgroud)

据我所知,Spark没有保存结果数据帧,但它保存了获取当前行中数据帧所需的所有操作.例如,当运行函数时,function1Spark只运行此函数,但是当运行function2Spark运行function1然后,function2.如果我真的需要只运行一个函数怎么办?

我尝试了df.cache(),df.persist()但我没有得到预期的结果.

我想以一种方式保存部分结果,这种方式不需要自开始以来只计算所有指令,只能从最后一个转换函数计算,而不会产生堆栈溢出错误.

python apache-spark pyspark

5
推荐指数
1
解决办法
649
查看次数

如何将列声明为 DataFrame 中的分类特征以用于 ml

如何声明我的给定列中DataFrame包含分类信息?

我有一个DataFrame从数据库加载的 Spark SQL 。其中的许多列DataFrame都有分类信息,但它们被编码为 Longs(为了隐私)。

我希望能够告诉 spark-ml,即使此列是数值,但信息实际上是分类的。类别的索引可能有一些漏洞,这是可以接受的。(例如,一列可能具有值 [1, 0, 0 ,4])

我知道存在 ,StringIndexer但我更愿意避免编码和解码的麻烦,特别是因为我有许多具有这种行为的列。

我会寻找类似于以下内容的东西

train = load_from_database()
categorical_cols = ["CategoricalColOfLongs1",
                    "CategoricalColOfLongs2"]
numeric_cols = ["NumericColOfLongs1"]

## This is what I am looking for
## this step detects the min and max value of both columns
## and adds metadata to indicate this as a categorical column
## with (1 + max - min) categories
categorizer = ColumnCategorizer(columns = categorical_cols,
                                autoDetectMinMax = True) …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark apache-spark-ml

5
推荐指数
1
解决办法
3838
查看次数

如何在ML pyspark管道中添加自己的功能作为自定义阶段?

来自Florian的示例代码

-----------+-----------+-----------+
|ball_column|keep_the   |hall_column|
+-----------+-----------+-----------+
|          0|          7|         14|
|          1|          8|         15|
|          2|          9|         16|
|          3|         10|         17|
|          4|         11|         18|
|          5|         12|         19|
|          6|         13|         20|
+-----------+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)

代码的第一部分删除了禁止列表中的列名

#first part of the code

banned_list = ["ball","fall","hall"]
condition = lambda col: any(word in col for word in banned_list)
new_df = df.drop(*filter(condition, df.columns))
Run Code Online (Sandbox Code Playgroud)

所以上面的代码应该放弃ball_columnhall_column.

代码的第二部分用于列表中的特定列.对于这个例子,我们将只剩下剩下的一个keep_column.

bagging = 
    Bucketizer(
        splits=[-float("inf"), 10, 100, float("inf")],
        inputCol='keep_the',
        outputCol='keep_the')
Run Code Online (Sandbox Code Playgroud)

现在使用管道装箱柱如下

model = Pipeline(stages=bagging).fit(df)

bucketedData …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
2241
查看次数

如何在pyspark管道中添加UDF?

我有以下代码,它基本上是在做特征工程管道:

token_q1=Tokenizer(inputCol='question1',outputCol='question1_tokens') 
token_q2=Tokenizer(inputCol='question2',outputCol='question2_tokens')  

remover_q1=StopWordsRemover(inputCol='question1_tokens',outputCol='question1_tokens_filtered')
remover_q2=StopWordsRemover(inputCol='question2_tokens',outputCol='question2_tokens_filtered')

q1w2model = Word2Vec(inputCol='question1_tokens_filtered',outputCol='q1_vectors')
q1w2model.setSeed(1)

q2w2model = Word2Vec(inputCol='question2_tokens_filtered',outputCol='q2_vectors')
q2w2model.setSeed(1)

pipeline=Pipeline(stages[token_q1,token_q2,remover_q1,remover_q2,q1w2model,q2w2model])
model=pipeline.fit(train)
result=model.transform(train)
result.show()
Run Code Online (Sandbox Code Playgroud)

我想将以下 UDF 添加到上述管道中:

charcount_q1 = F.udf(lambda row : sum([len(char) for char in row]),IntegerType())
Run Code Online (Sandbox Code Playgroud)

当我这样做时,我收到 Java 错误。有人可以指出我正确的方向吗?

但是,我使用以下基本有效的代码添加了此列:

charCountq1=train.withColumn("charcountq1", charcount_q1("question1"))
Run Code Online (Sandbox Code Playgroud)

但我想将它添加到管道中而不是这样做

apache-spark apache-spark-sql pyspark

3
推荐指数
1
解决办法
2081
查看次数

如何在 PySpark ML 中创建自定义标记器

sentenceDataFrame = spark.createDataFrame([
        (0, "Hi I heard about Spark"),
        (1, "I wish Java could use case classes"),
        (2, "Logistic,regression,models,are,neat")
    ], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words") 
tokenized = tokenizer.transform(sentenceDataFrame)
Run Code Online (Sandbox Code Playgroud)

如果我运行命令

tokenized.head()
Run Code Online (Sandbox Code Playgroud)

我想得到这样的结果

Row(id=0, sentence='Hi I heard about Spark',
    words=['H','i',' ','h','e',‘a’,……])
Run Code Online (Sandbox Code Playgroud)

然而,现在的结果是

Row(id=0, sentence='Hi I heard about Spark',
    words=['Hi','I','heard','about','spark'])
Run Code Online (Sandbox Code Playgroud)

有没有办法通过 PySpark 中的 Tokenizer 或 RegexTokenizer 来实现这一点?

类似的问题在这里?在 PySpark ML 中创建自定义 Transformer

python apache-spark pyspark spark-dataframe apache-spark-mllib

2
推荐指数
1
解决办法
3108
查看次数

在pyspark.ml中的多个功能上运行的变压器

我想在中创建自己的功能转换器DataFrame,以便添加一列,例如,这是其他两列之间的差异。我遵循了这个问题,但是那里的变压器只能在一个列上运行。pyspark.ml.Transformer以字符串作为参数inputCol,因此我当然不能指定多个列。

因此,基本上,我想要实现的是一种_transform()类似于该方法的方法:

def _transform(self, dataset):
    out_col = self.getOutputCol()
    in_col = dataset.select([self.getInputCol()])

    # Define transformer logic
    def f(col1, col2):
        return col1 - col2
    t = IntegerType()

    return dataset.withColumn(out_col, udf(f, t)(in_col))
Run Code Online (Sandbox Code Playgroud)

这怎么可能呢?

apache-spark pyspark apache-spark-ml

0
推荐指数
2
解决办法
788
查看次数