相关疑难解决方法(0)

等效的IF然后是ELSE

我早些时候在这里看过这个问题,并从中吸取了教训.但是,当我觉得它应该有效时,我不确定为什么会出现错误.

我想DataFrame通过一些规则在现有的Spark中创建一个新列.这是我写的.iris_spark是具有分类变量iris_spark的数据框,具有三个不同的类别.

from pyspark.sql import functions as F

iris_spark_df = iris_spark.withColumn(
    "Class", 
   F.when(iris_spark.iris_class == 'Iris-setosa', 0, F.when(iris_spark.iris_class == 'Iris-versicolor',1)).otherwise(2))
Run Code Online (Sandbox Code Playgroud)

引发以下错误.

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-157-21818c7dc060> in <module>()
----> 1 iris_spark_df=iris_spark.withColumn("Class",F.when(iris_spark.iris_class=='Iris-setosa',0,F.when(iris_spark.iris_class=='Iris-versicolor',1)))

TypeError: when() takes exactly 2 arguments (3 given)


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-157-21818c7dc060> in <module>()
----> 1 iris_spark_df=iris_spark.withColumn("Class",F.when(iris_spark.iris_class=='Iris-setosa',0,F.when(iris_spark.iris_class=='Iris-versicolor',1)))

TypeError: when() takes exactly 2 arguments (3 given)
Run Code Online (Sandbox Code Playgroud)

知道为什么吗?

python apache-spark apache-spark-sql pyspark

16
推荐指数
3
解决办法
3万
查看次数

PySpark - 逐行转换为 JSON

我有一个非常大的 pyspark 数据框。我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题。我最初使用了以下代码。

for message in df.toJSON().collect():
        kafkaClient.send(message) 
Run Code Online (Sandbox Code Playgroud)

但是,数据框非常大,因此在尝试collect().

我正在考虑使用 aUDF因为它逐行处理它。

from pyspark.sql.functions import udf, struct

def get_row(row):
    json = row.toJSON()
    kafkaClient.send(message) 
    return "Sent"

send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
Run Code Online (Sandbox Code Playgroud)

但是我收到一个错误,因为列被输入到函数而不是行。

出于说明目的,我们可以使用下面的 df,我们可以假设必须发送 Col1 和 Col2。

df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
Run Code Online (Sandbox Code Playgroud)

每行的 JSON 字符串:

'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
Run Code Online (Sandbox Code Playgroud)

python json pyspark spark-dataframe

7
推荐指数
1
解决办法
1万
查看次数