我想Estimator在PySpark MLlib中构建一个简单的自定义.我在这里可以编写一个自定义的Transformer,但我不知道如何在一个Estimator.我也不明白是什么@keyword_only以及为什么我需要这么多的二传手和吸气剂.Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有.
示例模型的伪代码:
class NormalDeviation():
def __init__(self, threshold = 3):
def fit(x, y=None):
self.model = {'mean': x.mean(), 'std': x.std()]
def predict(x):
return ((x-self.model['mean']) > self.threshold * self.model['std'])
def decision_function(x): # does ml-lib support this?
Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark apache-spark-ml apache-spark-mllib
我在PySpark ML中创建自定义Transformer的评论部分找到了相同的讨论,但没有明确的答案.还有一个未解决的JIRA对应于:https://issues.apache.org/jira/browse/SPARK-17025.
鉴于Pyspark ML管道没有提供用于保存用python编写的自定义转换器的选项,有什么其他选项可以完成它?如何在我的python类中实现返回兼容java对象的_to_java方法?
我写了一个自定义变换器,就像这里描述的那样.
当我的变压器作为第一步创建管道时,我能够训练(Logistic回归)模型进行分类.
但是,当我想用这个管道执行交叉验证时,如下所示:
from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
sentenceDataFrame = sqlContext.createDataFrame([
(1.0, "Hi I heard about Spark"),
(1.0, "Spark is awesome"),
(0.0, "But there seems to be a problem"),
(0.0, "And I don't know why...")
], ["label", "sentence"])
tokenizer = NLTKWordPunctTokenizer(
inputCol="sentence", outputCol="words",
stopwords=set(nltk.corpus.stopwords.words('english')))
hasher = HashingTF(inputCol="words",outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer,hasher,lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
.addGrid(lr.tol, (1e-5, 1e-6))\
.build() …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark apache-spark-1.4 apache-spark-ml
我正在apache-spark使用python在单个Dataframe上进行多次转换.
我编写了一些函数,以便更容易进行不同的转换.想象一下,我们有以下功能:
clearAccents(df,columns)
#lines that remove accents from dataframe with spark functions or
#udf
return df
Run Code Online (Sandbox Code Playgroud)
我使用这些函数"覆盖"dataframe变量,以保存每次函数返回时转换的新数据帧.我知道这不是一个好习惯,现在我看到了后果.
我注意到每次添加如下所示的行时,运行时间会更长:
# Step transformation 1:
df = function1(df,column)
# Step transformation 2.
df = function2(df, column)
Run Code Online (Sandbox Code Playgroud)
据我所知,Spark没有保存结果数据帧,但它保存了获取当前行中数据帧所需的所有操作.例如,当运行函数时,function1Spark只运行此函数,但是当运行function2Spark运行function1然后,function2.如果我真的需要只运行一个函数怎么办?
我尝试了df.cache(),df.persist()但我没有得到预期的结果.
我想以一种方式保存部分结果,这种方式不需要自开始以来只计算所有指令,只能从最后一个转换函数计算,而不会产生堆栈溢出错误.
如何声明我的给定列中DataFrame包含分类信息?
我有一个DataFrame从数据库加载的 Spark SQL 。其中的许多列DataFrame都有分类信息,但它们被编码为 Longs(为了隐私)。
我希望能够告诉 spark-ml,即使此列是数值,但信息实际上是分类的。类别的索引可能有一些漏洞,这是可以接受的。(例如,一列可能具有值 [1, 0, 0 ,4])
我知道存在 ,StringIndexer但我更愿意避免编码和解码的麻烦,特别是因为我有许多具有这种行为的列。
我会寻找类似于以下内容的东西
train = load_from_database()
categorical_cols = ["CategoricalColOfLongs1",
"CategoricalColOfLongs2"]
numeric_cols = ["NumericColOfLongs1"]
## This is what I am looking for
## this step detects the min and max value of both columns
## and adds metadata to indicate this as a categorical column
## with (1 + max - min) categories
categorizer = ColumnCategorizer(columns = categorical_cols,
autoDetectMinMax = True) …Run Code Online (Sandbox Code Playgroud) 来自Florian的示例代码
-----------+-----------+-----------+
|ball_column|keep_the |hall_column|
+-----------+-----------+-----------+
| 0| 7| 14|
| 1| 8| 15|
| 2| 9| 16|
| 3| 10| 17|
| 4| 11| 18|
| 5| 12| 19|
| 6| 13| 20|
+-----------+-----------+-----------+
Run Code Online (Sandbox Code Playgroud)
代码的第一部分删除了禁止列表中的列名
#first part of the code
banned_list = ["ball","fall","hall"]
condition = lambda col: any(word in col for word in banned_list)
new_df = df.drop(*filter(condition, df.columns))
Run Code Online (Sandbox Code Playgroud)
所以上面的代码应该放弃ball_column和hall_column.
代码的第二部分用于列表中的特定列.对于这个例子,我们将只剩下剩下的一个keep_column.
bagging =
Bucketizer(
splits=[-float("inf"), 10, 100, float("inf")],
inputCol='keep_the',
outputCol='keep_the')
Run Code Online (Sandbox Code Playgroud)
现在使用管道装箱柱如下
model = Pipeline(stages=bagging).fit(df)
bucketedData …Run Code Online (Sandbox Code Playgroud) 我有以下代码,它基本上是在做特征工程管道:
token_q1=Tokenizer(inputCol='question1',outputCol='question1_tokens')
token_q2=Tokenizer(inputCol='question2',outputCol='question2_tokens')
remover_q1=StopWordsRemover(inputCol='question1_tokens',outputCol='question1_tokens_filtered')
remover_q2=StopWordsRemover(inputCol='question2_tokens',outputCol='question2_tokens_filtered')
q1w2model = Word2Vec(inputCol='question1_tokens_filtered',outputCol='q1_vectors')
q1w2model.setSeed(1)
q2w2model = Word2Vec(inputCol='question2_tokens_filtered',outputCol='q2_vectors')
q2w2model.setSeed(1)
pipeline=Pipeline(stages[token_q1,token_q2,remover_q1,remover_q2,q1w2model,q2w2model])
model=pipeline.fit(train)
result=model.transform(train)
result.show()
Run Code Online (Sandbox Code Playgroud)
我想将以下 UDF 添加到上述管道中:
charcount_q1 = F.udf(lambda row : sum([len(char) for char in row]),IntegerType())
Run Code Online (Sandbox Code Playgroud)
当我这样做时,我收到 Java 错误。有人可以指出我正确的方向吗?
但是,我使用以下基本有效的代码添加了此列:
charCountq1=train.withColumn("charcountq1", charcount_q1("question1"))
Run Code Online (Sandbox Code Playgroud)
但我想将它添加到管道中而不是这样做
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenized = tokenizer.transform(sentenceDataFrame)
Run Code Online (Sandbox Code Playgroud)
如果我运行命令
tokenized.head()
Run Code Online (Sandbox Code Playgroud)
我想得到这样的结果
Row(id=0, sentence='Hi I heard about Spark',
words=['H','i',' ','h','e',‘a’,……])
Run Code Online (Sandbox Code Playgroud)
然而,现在的结果是
Row(id=0, sentence='Hi I heard about Spark',
words=['Hi','I','heard','about','spark'])
Run Code Online (Sandbox Code Playgroud)
有没有办法通过 PySpark 中的 Tokenizer 或 RegexTokenizer 来实现这一点?
类似的问题在这里?在 PySpark ML 中创建自定义 Transformer
python apache-spark pyspark spark-dataframe apache-spark-mllib
我想在中创建自己的功能转换器DataFrame,以便添加一列,例如,这是其他两列之间的差异。我遵循了这个问题,但是那里的变压器只能在一个列上运行。pyspark.ml.Transformer以字符串作为参数inputCol,因此我当然不能指定多个列。
因此,基本上,我想要实现的是一种_transform()类似于该方法的方法:
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset.select([self.getInputCol()])
# Define transformer logic
def f(col1, col2):
return col1 - col2
t = IntegerType()
return dataset.withColumn(out_col, udf(f, t)(in_col))
Run Code Online (Sandbox Code Playgroud)
这怎么可能呢?