Cyb*_*unk 3 apache-spark apache-spark-sql pyspark
我有以下代码,它基本上是在做特征工程管道:
token_q1=Tokenizer(inputCol='question1',outputCol='question1_tokens')
token_q2=Tokenizer(inputCol='question2',outputCol='question2_tokens')
remover_q1=StopWordsRemover(inputCol='question1_tokens',outputCol='question1_tokens_filtered')
remover_q2=StopWordsRemover(inputCol='question2_tokens',outputCol='question2_tokens_filtered')
q1w2model = Word2Vec(inputCol='question1_tokens_filtered',outputCol='q1_vectors')
q1w2model.setSeed(1)
q2w2model = Word2Vec(inputCol='question2_tokens_filtered',outputCol='q2_vectors')
q2w2model.setSeed(1)
pipeline=Pipeline(stages[token_q1,token_q2,remover_q1,remover_q2,q1w2model,q2w2model])
model=pipeline.fit(train)
result=model.transform(train)
result.show()
Run Code Online (Sandbox Code Playgroud)
我想将以下 UDF 添加到上述管道中:
charcount_q1 = F.udf(lambda row : sum([len(char) for char in row]),IntegerType())
Run Code Online (Sandbox Code Playgroud)
当我这样做时,我收到 Java 错误。有人可以指出我正确的方向吗?
但是,我使用以下基本有效的代码添加了此列:
charCountq1=train.withColumn("charcountq1", charcount_q1("question1"))
Run Code Online (Sandbox Code Playgroud)
但我想将它添加到管道中而不是这样做
如果要使用udf
in Pipeline
,则需要以下其中一项:
对于这样一个简单的用例,第一个非常冗长,所以我推荐第二个选项:
from pyspark.sql.functions import udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer
charcount_q1 = spark.udf.register(
"charcount_q1",
lambda row : sum(len(char) for char in row),
"integer"
)
df = spark.createDataFrame(
[(1, ["spark", "java", "python"])],
("id", "question1"))
pipeline = Pipeline(stages = [SQLTransformer(
statement = "SELECT *, charcount_q1(question1) charcountq1 FROM __THIS__"
)])
pipeline.fit(df).transform(df).show()
# +---+--------------------+-----------+
# | id| question1|charcountq1|
# +---+--------------------+-----------+
# | 1|[spark, java, pyt...| 15|
# +---+--------------------+-----------+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2081 次 |
最近记录: |