我试图在 Pyspark 的 case 语句中运行子查询,但它抛出异常。如果一个表中的 id 存在于另一个表中,我正在尝试创建一个新标志。
任何人都可以让我知道这在 pyspark 中是否可行?
temp_df=spark.sql("select *, case when key in (select distinct key from Ids) then 1 else 0 end as flag from main_table")
Run Code Online (Sandbox Code Playgroud)
这是错误:
AnalysisException: 'Predicate sub-queries can only be used in a Filter
Run Code Online (Sandbox Code Playgroud) 首先,我尝试了下面链接中的所有内容来修复我的错误,但没有一个起作用。
如何在pyspark中将密集向量的RDD转换为DataFrame?
我正在尝试将密集向量转换为数据框(最好是 Spark)以及列名并遇到问题。
我在 spark 数据框中的列是使用 Vector Assembler 创建的向量,我现在想将其转换回数据帧,因为我想在向量中的某些变量上创建图。
方法一:
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml.linalg import Vectors
temp=output.select("all_features")
temp.rdd.map(
lambda row: (DenseVector(row[0].toArray()))
).toDF()
Run Code Online (Sandbox Code Playgroud)
下面是错误
TypeError: not supported type: <type 'numpy.ndarray'>
Run Code Online (Sandbox Code Playgroud)
方法二:
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.linalg import *
as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())
result = output.withColumn("all_features", as_ml("all_features"))
result.head(5)
Run Code Online (Sandbox Code Playgroud)
错误:
AttributeError: 'numpy.ndarray' object has no attribute 'asML'
Run Code Online (Sandbox Code Playgroud)
我还尝试将数据帧转换为 Pandas 数据帧,之后我无法将值拆分为单独的列
方法三:
pandas_df=temp.toPandas()
pandas_df1=pd.DataFrame(pandas_df.all_features.values.tolist())
Run Code Online (Sandbox Code Playgroud)
上面的代码运行良好,但我的数据框中仍然只有一列,所有值以逗号分隔作为列表。
任何帮助是极大的赞赏! …
我已经使用 spark ml 管道构建了一个逻辑回归模型并保存了它。我正在尝试将管道应用于新记录集并收到错误。我的管道中有向量汇编器、标准缩放器和逻辑回归模型。
我尝试pipeline.transform
并收到以下错误
AttributeError: 'Pipeline' 对象没有属性 'transform'
下面是代码
from pyspark.ml import Pipeline
pipelineModel = Pipeline.load("/user/userid/lr_pipe")
scored_temp = pipelineModel.transform(combined_data_imputed_final)
Run Code Online (Sandbox Code Playgroud)
这是我保存管道的方法
from pyspark.ml.classification import LogisticRegression
vector = VectorAssembler(inputCols=final_features, outputCol="final_features")
scaler = StandardScaler(inputCol="final_features", outputCol="final_scaled_features")
lr = LogisticRegression(labelCol="label", featuresCol="final_scaled_features", maxIter=30)
stages = [vector,scaler,lr]
pipe = Pipeline(stages=stages)
lrModel = pipe.fit(train_transformed_data_1).transform(train_transformed_data_1)
pipe.save("lr_pipe")
Run Code Online (Sandbox Code Playgroud)
我期待它完成所有管道步骤并为记录评分。