小编kku*_*mar的帖子

Pyspark- case 语句中的子查询

我试图在 Pyspark 的 case 语句中运行子查询,但它抛出异常。如果一个表中的 id 存在于另一个表中,我正在尝试创建一个新标志。

任何人都可以让我知道这在 pyspark 中是否可行?

temp_df=spark.sql("select *, case when key in (select distinct key from Ids) then 1 else 0 end as flag from main_table")
Run Code Online (Sandbox Code Playgroud)

这是错误:

AnalysisException: 'Predicate sub-queries can only be used in a Filter
Run Code Online (Sandbox Code Playgroud)

python pyspark pyspark-sql

5
推荐指数
1
解决办法
6752
查看次数

使用 Pyspark 将密集向量转换为数据帧

首先,我尝试了下面链接中的所有内容来修复我的错误,但没有一个起作用。

如何在pyspark中将密集向量的RDD转换为DataFrame?

我正在尝试将密集向量转换为数据框(最好是 Spark)以及列名并遇到问题。

我在 spark 数据框中的列是使用 Vector Assembler 创建的向量,我现在想将其转换回数据帧,因为我想在向量中的某些变量上创建图。

方法一:

from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml.linalg import Vectors

temp=output.select("all_features")
temp.rdd.map(
    lambda row: (DenseVector(row[0].toArray()))
).toDF()
Run Code Online (Sandbox Code Playgroud)

下面是错误

TypeError: not supported type: <type 'numpy.ndarray'>
Run Code Online (Sandbox Code Playgroud)

方法二:

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.linalg import *

as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())
result = output.withColumn("all_features", as_ml("all_features"))
result.head(5)
Run Code Online (Sandbox Code Playgroud)

错误:

AttributeError: 'numpy.ndarray' object has no attribute 'asML'
Run Code Online (Sandbox Code Playgroud)

我还尝试将数据帧转换为 Pandas 数据帧,之后我无法将值拆分为单独的列

方法三:

pandas_df=temp.toPandas()
pandas_df1=pd.DataFrame(pandas_df.all_features.values.tolist())
Run Code Online (Sandbox Code Playgroud)

上面的代码运行良好,但我的数据框中仍然只有一列,所有值以逗号分隔作为列表。

任何帮助是极大的赞赏! …

python dataframe pandas apache-spark

3
推荐指数
1
解决办法
4766
查看次数

属性错误:管道对象没有属性转换

我已经使用 spark ml 管道构建了一个逻辑回归模型并保存了它。我正在尝试将管道应用于新记录集并收到错误。我的管道中有向量汇编器、标准缩放器和逻辑回归模型。

我尝试pipeline.transform并收到以下错误

AttributeError: 'Pipeline' 对象没有属性 'transform'

下面是代码

from pyspark.ml import Pipeline
pipelineModel = Pipeline.load("/user/userid/lr_pipe")
scored_temp = pipelineModel.transform(combined_data_imputed_final)
Run Code Online (Sandbox Code Playgroud)

这是我保存管道的方法

from pyspark.ml.classification import LogisticRegression

vector = VectorAssembler(inputCols=final_features, outputCol="final_features")
scaler = StandardScaler(inputCol="final_features", outputCol="final_scaled_features")
lr = LogisticRegression(labelCol="label", featuresCol="final_scaled_features", maxIter=30)

stages = [vector,scaler,lr]

pipe = Pipeline(stages=stages)

lrModel = pipe.fit(train_transformed_data_1).transform(train_transformed_data_1)
pipe.save("lr_pipe")
Run Code Online (Sandbox Code Playgroud)

我期待它完成所有管道步骤并为记录评分。

pipeline apache-spark pyspark apache-spark-ml

0
推荐指数
1
解决办法
1456
查看次数