标签: apache-spark-sql

为什么 Spark 失败并显示“值写入不是 org.apache.spark.sql.DataFrameReader [错误] 的成员”?

我有两个几乎相同的写入 db 。scala 声明,但是一个给我一个错误另一个不是,我不明白如何解决它?有任何想法吗 ?

此声明有效

df_pm_visits_by_site_trn.write.format("jdbc").option("url", db_url_2).option("dbtable", "pm_visits_by_site_trn").option("user", db_user).option("password", db_pwd).option("truncate","true").mode("overwrite").save()
Run Code Online (Sandbox Code Playgroud)

这个不起作用并抛出我编译错误

df_trsnss .write.format("jdbc").option("url", db_url_2).option("dbtable", "df_trsnss")               .option("user", db_user).option("password", db_pwd).option("truncate","true").mode("overwrite").save()
Run Code Online (Sandbox Code Playgroud)

_dev.scala:464: value write 不是 org.apache.spark.sql.DataFrameReader [error] df_trsnss.write.format("jdbc").option("url", db_url_2).option("dbtable" , "trsnss").option("user", db_user).option("password", db_pwd).option("truncate","true").mode("overwrite").save()

如果我删除我的第二个 write 语句,或者只是简单地将其注释掉,则整个代码都在编译且没有错误。

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
1062
查看次数

使用 pyspark 创建年份列

我有一个时间戳列,我想从此列创建一个年份列。我知道如何显示它,但我无法在我的数据集上创建一列。到目前为止,我已经尝试过这个:

data = data.withColumn('Year', data.select(year(('Date')))
Run Code Online (Sandbox Code Playgroud)

但它抛出一个错误说:

断言错误:col 应该是 Column

我能够显示年份这样做:

data.select(year('Date').alias('Year')).show()
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
4159
查看次数

加入后火花删除多个重复的列

加入两个数据框后,我得到了许多重复的列,现在我想删除最后一个列,下面是我的 printSchema

root
 |-- id: string (nullable = true)
 |-- value: string (nullable = true)
 |-- test: string (nullable = true)
 |-- details: string (nullable = true)
 |-- test: string (nullable = true)
 |-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

现在我想删除最后两列

 |-- test: string (nullable = true)
 |-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我尝试使用 df..dropDuplicates() 但它全部删除

如何删除最后出现的重复列?

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
7985
查看次数

PySpark- 如何使用 Pyspark 计算每个字段的最小值、最大值?

我试图找到由 sql 语句产生的每个字段的 min 、 max 并将其写入一个 csv 文件。我试图以低于方式获得结果。能否请你帮忙。我已经用 python 编写过,但现在尝试将其转换为 pyspark 以直接在 hadoop 集群中运行

在此处输入图片说明

from pyspark.sql.functions import max, min, mean, stddev
from pyspark import SparkContext
sc =SparkContext()
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
#bank = hive_context.table("cip_utilities.file_upload_temp")
data=hive_context.sql("select * from cip_utilities.cdm_variables_dict")
hive_context.sql("describe cip_utilities.cdm_variables_dict").registerTempTable("schema_def")
temp_data=hive_context.sql("select * from schema_def")
temp_data.show()
data1=hive_context.sql("select col_name from schema_def where data_type<>'string'")
colum_names_as_python_list_of_rows = data1.collect()
#data1.show()
for line in colum_names_as_python_list_of_rows:
        #print value in MyCol1 for each row                
        ---Here i need to calculate min, max, mean etc for …
Run Code Online (Sandbox Code Playgroud)

python-3.x apache-spark apache-spark-sql pyspark pyspark-sql

1
推荐指数
1
解决办法
2万
查看次数

使用pyspark进行条件聚合

考虑以下作为数据框

a        b  c   d   e  
africa  123 1   10  121.2
africa  123 1   10  321.98
africa  123 2   12  43.92
africa  124 2   12  43.92
usa     121 1   12  825.32
usa     121 1   12  89.78
usa     123 2   10  32.24
usa     123 5   21  43.92
canada  132 2   13  63.21
canada  132 2   13  89.23
canada  132 3   21  85.32
canada  131 3   10  43.92
Run Code Online (Sandbox Code Playgroud)

现在我想使用数据帧将下面的 case 语句转换为 PYSPARK 中的等效语句。

我们可以直接在 case 语句中使用 hivecontex/sqlcontest nut 寻找传统的 pyspark nql 查询 …

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1万
查看次数

Pyspark - from_unixtime 未显示正确的日期时间

我想将包含纪元时间的时间戳列转换为日期时间(人类可读)。from_unixtime没有给我正确的日期和时间。请帮忙。

df = spark.createDataFrame([('1535934855077532656',), ('1535934855077532656',),('1535935539886503614',)], ['timestamp',])

df.show()
Run Code Online (Sandbox Code Playgroud)
df = spark.createDataFrame([('1535934855077532656',), ('1535934855077532656',),('1535935539886503614',)], ['timestamp',])

df.show()
Run Code Online (Sandbox Code Playgroud)
+-------------------+
|          timestamp|
+-------------------+
|1535934855077532656|
|1535934855077532656|
|1535935539886503614|
+-------------------+
Run Code Online (Sandbox Code Playgroud)
df.withColumn('datetime',from_unixtime(df.timestamp,"yyyy-MM-dd HH:mm:ss:SSS")).select(['timestamp','datetime']).show(15,False)
Run Code Online (Sandbox Code Playgroud)

timestamp apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
2438
查看次数

spark中的dataframe.show()和dataframe.take()有什么区别?为了提高性能,我们需要增加什么?

我正在使用数据帧从镶木地板文件中读取数据并创建临时视图并在临时视图之上运行 SQL 查询。

spark.read.parquet("filename.parquet").createOrReplaceTempView("temptable")

val df = spark.sql("SELECT * FROM temptable")

检查df我正在使用的结果,df.show()但执行需要更多时间,如果我使用,我没有看到任何区别df.take(10)

take()和之间有什么区别show()吗。我应该使用哪种方法来获得更好的性能来检查结果

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
3797
查看次数

如何提高spark中的查询性能?

我有一个连接 4 个表的查询,我使用查询下推将其读入数据帧。

val df = spark.read.format("jdbc").
 option("url", "jdbc:mysql://ip/dbname").
 option("driver", "com.mysql.jdbc.Driver").
 option("user", "username").
 option("password", "password")
 .option("dbtable",s"($query) as temptable")
 .load()
Run Code Online (Sandbox Code Playgroud)

单个表中的记录数分别为 430、350、64、2354,加载需要 12.784 秒,创建 SparkSession 需要 2.119 秒

然后我将结果数据计算为,

 val count=df.count()
 println(s"count $count")
Run Code Online (Sandbox Code Playgroud)

那么总执行时间为 25.806 秒,结果仅包含 430 条记录。

当我在 sql workbench 中尝试相同的操作时,只需几秒钟即可完全执行。我也在 load() 之后尝试缓存,但它需要相同的时间。那么我怎样才能比我所做的更快地执行它。

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
178
查看次数

如何在pyspark中将DenseMatrix转换为spark DataFrame?

除了以下使用 Scala 的示例外,我没有找到任何 pyspark 代码来将矩阵转换为火花数据帧。有谁知道如何改用python?

如何将 mllib 矩阵转换为 spark 数据帧?

python apache-spark apache-spark-sql

1
推荐指数
1
解决办法
3589
查看次数

是否有内置的 Spark 可以展平嵌套数组?

我有一个 DataFrame 字段,它是Seq[Seq[String]]我构建的 UDF 以将所述列转换为 Seq[String] 列;基本上,一个flatten来自 Scala的函数的 UDF 。

def combineSentences(inCol: String, outCol: String): DataFrame => DataFrame = {

    def flatfunc(seqOfSeq: Seq[Seq[String]]): Seq[String] = seqOfSeq match {
        case null => Seq.empty[String]
        case _ => seqOfSeq.flatten
    }
    df: DataFrame => df.withColumn(outCol, udf(flatfunc _).apply(col(inCol)))
}
Run Code Online (Sandbox Code Playgroud)

我的用例是字符串,但显然,这可能是通用的。您可以在 DataFrame 转换链中使用此函数,例如:

df.transform(combineSentences(inCol, outCol))
Run Code Online (Sandbox Code Playgroud)

是否有一个 Spark 内置函数可以做同样的事情?我一直没能找到一个。

scala user-defined-functions apache-spark apache-spark-sql

1
推荐指数
1
解决办法
964
查看次数