我有两个几乎相同的写入 db 。scala 声明,但是一个给我一个错误另一个不是,我不明白如何解决它?有任何想法吗 ?
此声明有效
df_pm_visits_by_site_trn.write.format("jdbc").option("url", db_url_2).option("dbtable", "pm_visits_by_site_trn").option("user", db_user).option("password", db_pwd).option("truncate","true").mode("overwrite").save()
Run Code Online (Sandbox Code Playgroud)
这个不起作用并抛出我编译错误
df_trsnss .write.format("jdbc").option("url", db_url_2).option("dbtable", "df_trsnss") .option("user", db_user).option("password", db_pwd).option("truncate","true").mode("overwrite").save()
Run Code Online (Sandbox Code Playgroud)
_dev.scala:464: value write 不是 org.apache.spark.sql.DataFrameReader [error] df_trsnss.write.format("jdbc").option("url", db_url_2).option("dbtable" , "trsnss").option("user", db_user).option("password", db_pwd).option("truncate","true").mode("overwrite").save()
如果我删除我的第二个 write 语句,或者只是简单地将其注释掉,则整个代码都在编译且没有错误。
我有一个时间戳列,我想从此列创建一个年份列。我知道如何显示它,但我无法在我的数据集上创建一列。到目前为止,我已经尝试过这个:
data = data.withColumn('Year', data.select(year(('Date')))
Run Code Online (Sandbox Code Playgroud)
但它抛出一个错误说:
断言错误:col 应该是 Column
我能够显示年份这样做:
data.select(year('Date').alias('Year')).show()
Run Code Online (Sandbox Code Playgroud) 加入两个数据框后,我得到了许多重复的列,现在我想删除最后一个列,下面是我的 printSchema
root
|-- id: string (nullable = true)
|-- value: string (nullable = true)
|-- test: string (nullable = true)
|-- details: string (nullable = true)
|-- test: string (nullable = true)
|-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在我想删除最后两列
|-- test: string (nullable = true)
|-- value: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我尝试使用 df..dropDuplicates() 但它全部删除
如何删除最后出现的重复列?
我试图找到由 sql 语句产生的每个字段的 min 、 max 并将其写入一个 csv 文件。我试图以低于方式获得结果。能否请你帮忙。我已经用 python 编写过,但现在尝试将其转换为 pyspark 以直接在 hadoop 集群中运行
from pyspark.sql.functions import max, min, mean, stddev
from pyspark import SparkContext
sc =SparkContext()
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
#bank = hive_context.table("cip_utilities.file_upload_temp")
data=hive_context.sql("select * from cip_utilities.cdm_variables_dict")
hive_context.sql("describe cip_utilities.cdm_variables_dict").registerTempTable("schema_def")
temp_data=hive_context.sql("select * from schema_def")
temp_data.show()
data1=hive_context.sql("select col_name from schema_def where data_type<>'string'")
colum_names_as_python_list_of_rows = data1.collect()
#data1.show()
for line in colum_names_as_python_list_of_rows:
#print value in MyCol1 for each row
---Here i need to calculate min, max, mean etc for …Run Code Online (Sandbox Code Playgroud) python-3.x apache-spark apache-spark-sql pyspark pyspark-sql
考虑以下作为数据框
a b c d e
africa 123 1 10 121.2
africa 123 1 10 321.98
africa 123 2 12 43.92
africa 124 2 12 43.92
usa 121 1 12 825.32
usa 121 1 12 89.78
usa 123 2 10 32.24
usa 123 5 21 43.92
canada 132 2 13 63.21
canada 132 2 13 89.23
canada 132 3 21 85.32
canada 131 3 10 43.92
Run Code Online (Sandbox Code Playgroud)
现在我想使用数据帧将下面的 case 语句转换为 PYSPARK 中的等效语句。
我们可以直接在 case 语句中使用 hivecontex/sqlcontest nut 寻找传统的 pyspark nql 查询 …
我想将包含纪元时间的时间戳列转换为日期时间(人类可读)。from_unixtime没有给我正确的日期和时间。请帮忙。
df = spark.createDataFrame([('1535934855077532656',), ('1535934855077532656',),('1535935539886503614',)], ['timestamp',])
df.show()
Run Code Online (Sandbox Code Playgroud)
df = spark.createDataFrame([('1535934855077532656',), ('1535934855077532656',),('1535935539886503614',)], ['timestamp',])
df.show()
Run Code Online (Sandbox Code Playgroud)
+-------------------+
| timestamp|
+-------------------+
|1535934855077532656|
|1535934855077532656|
|1535935539886503614|
+-------------------+
Run Code Online (Sandbox Code Playgroud)
df.withColumn('datetime',from_unixtime(df.timestamp,"yyyy-MM-dd HH:mm:ss:SSS")).select(['timestamp','datetime']).show(15,False)
Run Code Online (Sandbox Code Playgroud) 我正在使用数据帧从镶木地板文件中读取数据并创建临时视图并在临时视图之上运行 SQL 查询。
spark.read.parquet("filename.parquet").createOrReplaceTempView("temptable")
val df = spark.sql("SELECT * FROM temptable")
检查df我正在使用的结果,df.show()但执行需要更多时间,如果我使用,我没有看到任何区别df.take(10)
take()和之间有什么区别show()吗。我应该使用哪种方法来获得更好的性能来检查结果
我有一个连接 4 个表的查询,我使用查询下推将其读入数据帧。
val df = spark.read.format("jdbc").
option("url", "jdbc:mysql://ip/dbname").
option("driver", "com.mysql.jdbc.Driver").
option("user", "username").
option("password", "password")
.option("dbtable",s"($query) as temptable")
.load()
Run Code Online (Sandbox Code Playgroud)
单个表中的记录数分别为 430、350、64、2354,加载需要 12.784 秒,创建 SparkSession 需要 2.119 秒
然后我将结果数据计算为,
val count=df.count()
println(s"count $count")
Run Code Online (Sandbox Code Playgroud)
那么总执行时间为 25.806 秒,结果仅包含 430 条记录。
当我在 sql workbench 中尝试相同的操作时,只需几秒钟即可完全执行。我也在 load() 之后尝试缓存,但它需要相同的时间。那么我怎样才能比我所做的更快地执行它。
除了以下使用 Scala 的示例外,我没有找到任何 pyspark 代码来将矩阵转换为火花数据帧。有谁知道如何改用python?
我有一个 DataFrame 字段,它是Seq[Seq[String]]我构建的 UDF 以将所述列转换为 Seq[String] 列;基本上,一个flatten来自 Scala的函数的 UDF 。
def combineSentences(inCol: String, outCol: String): DataFrame => DataFrame = {
def flatfunc(seqOfSeq: Seq[Seq[String]]): Seq[String] = seqOfSeq match {
case null => Seq.empty[String]
case _ => seqOfSeq.flatten
}
df: DataFrame => df.withColumn(outCol, udf(flatfunc _).apply(col(inCol)))
}
Run Code Online (Sandbox Code Playgroud)
我的用例是字符串,但显然,这可能是通用的。您可以在 DataFrame 转换链中使用此函数,例如:
df.transform(combineSentences(inCol, outCol))
Run Code Online (Sandbox Code Playgroud)
是否有一个 Spark 内置函数可以做同样的事情?我一直没能找到一个。
apache-spark ×10
apache-spark-sql ×10
pyspark ×4
python ×3
scala ×3
pyspark-sql ×1
python-3.x ×1
timestamp ×1