标签: apache-spark-sql

Spark 的reduceByKey 最佳实践

我有一个具有下一个架构的数据框:

root
 |-- id_1: long (nullable = true)
 |-- id_2: long (nullable = true)
 |-- score: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

数据如下:

+----+----+------------------+
|id_1|id_2|score             |
+----+----+------------------+
|0   |9   |0.5888888888888889|
|0   |1   |0.6166666666666667|
|0   |2   |0.496996996996997 |
|1   |9   |0.6222222222222221|
|1   |6   |0.9082996632996633|
|1   |5   |0.5927450980392157|
|2   |3   |0.665774107440774 |
|3   |8   |0.6872367465504721|
|3   |8   |0.6872367465504721|
|5   |6   |0.5365909090909091|
+----+----+------------------+
Run Code Online (Sandbox Code Playgroud)

目标是为每个 id_1 找到具有最大得分 的 id_2。也许我错了,但是......只需要创建配对的 RDD:

root
 |-- _1: long (nullable = true)
 |-- _2: struct (nullable = true)
 | …
Run Code Online (Sandbox Code Playgroud)

python bigdata apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1695
查看次数

如何在sparksql中给出带有空格的别名

我试过下面的代码

审判1

…………

val df2=sqlContext.sql("select concat(' ',Id,LabelName) as 'first last'  from  p1 order by LabelName desc ");
Run Code Online (Sandbox Code Playgroud)

审判-2

…………

val df2=sqlContext.sql("select concat(' ',Id,LabelName)   from  p1 order by LabelName desc ");

val df3=df2.toDF("first last")
Run Code Online (Sandbox Code Playgroud)

当我尝试运行它时,Trial-1 抛出错误......但在 Trial-2 中,它正在接受命令,但当我执行以下操作时抛出错误

scala> df3.write.parquet("/prashanth/a1")
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
6933
查看次数

在 SparkSQL 中分割字符串

我有一个包含几行的文件。例如

A               B       C    
awer.ttp.net    Code    554
abcd.ttp.net    Code    747
asdf.ttp.net    Part    554
xyz.ttp.net     Part    747
Run Code Online (Sandbox Code Playgroud)

我想要创建一个 SparkSQL 语句来仅拆分表的 a 列,并且希望向表 D 添加一个新行,其值为 awe、abcd、asdf 和 xyz。

sql apache-spark-sql

1
推荐指数
1
解决办法
1万
查看次数

在 Pyspark 中使用 where 子句更新列

如何使用 where 子句更新 Pyspark 数据框中的列?

这类似于此 SQL 操作:

   UPDATE table1 SET alpha1= x WHERE alpha2< 6;
Run Code Online (Sandbox Code Playgroud)

其中 alpha1 和 alpha2 是 table1 的列。

例如:我有一个数据框 table1,其值如下:

表格1

阿尔法1 阿尔法2
3 7
4 5
5 4
6 8

更新后的数据框表1:

阿尔法1 阿尔法2
3 7
x 5
x 4
6 8

如何在 pyspark 数据框中执行此操作?

dataframe apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
3771
查看次数

Scala Spark 中的日期时间转换(24 小时格式)

我已按以下方式格式化数据帧中的时间戳。

var df_v_5 = df_v_4..withColumn("endTimeFormat", 
from_unixtime(unix_timestamp('DateTime), "dd-MM-yyyy hh:mm:ss"))
Run Code Online (Sandbox Code Playgroud)

我得到的输出为

DateTime,value1,value2,endTimeFormat
2017-01-01T12:00:00.000+05:30,11,-14,01-01-2017 12:00:00
2017-01-01T13:00:00.000+05:30,110,13,01-01-2017 01:00:00
Run Code Online (Sandbox Code Playgroud)

预期输出:

DateTime,value1,value2,endTimeFormat
2017-01-01T12:00:00.000+05:30,11,-14,01-01-2017 12:00:00
2017-01-01T13:00:00.000+05:30,110,13,01-01-2017 13:00:00
Run Code Online (Sandbox Code Playgroud)

如何将此时间戳转换为 24 小时格式?

datetime scala dataframe apache-spark apache-spark-sql

1
推荐指数
1
解决办法
4369
查看次数

使用 mergeSchema 时 Spark DataFrame 重复列名

我有一个巨大的 Spark DataFrame,我使用以下语句创建它

val df = sqlContext.read.option("mergeSchema", "true").parquet("parquet/partitions/path")
Run Code Online (Sandbox Code Playgroud)

现在,当我尝试在上面的 DataFrame 上执行列重命名或选择操作时,它失败说发现了不明确的列,但出现以下异常

org.apache.spark.sql.AnalysisException:引用“Product_Type”不明确,可能是 Product_Type#13、Product_Type#235

现在我看到列,发现有两列Product_TypeProduct_type它们似乎是相同的列,但由于随着时间的推移模式合并而创建了一个字母大小写不同的列。现在我不介意保留重复的列,但 Spark sqlContext 由于某种原因不喜欢它。

我相信默认spark.sql.caseSensitive配置是正确的,所以不知道为什么会失败。我正在使用 Spark 1.5.2。我是 Spark 新手。

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
4922
查看次数

使用数组修改数据框列

我有以下数据框:

+----------+ 
|col       | 
+----------+ 
|[1, 4, 3] | 
|[1, 5, 11]| 
|[1, 3, 3] | 
|[1, 4, 3] | 
|[1, 6, 3] | 
|[1, 1, 3] | 
+----------+
Run Code Online (Sandbox Code Playgroud)

我想要的是:

+----------+ 
|col_new   | 
+----------+ 
|[3, -1]   | 
|[4, 6]    | 
|[2, 0]    | 
|[3, -1]   | 
|[5, -3]   | 
|[0, 2]    | 
+----------+
Run Code Online (Sandbox Code Playgroud)

=> 差异运算符 arr[n+1] - arr[n]

而且我不知道该怎么做。

我想我应该用 udf 来做?我不太熟悉它,但是我尝试过。

+----------+ 
|col       | 
+----------+ 
|[1, 4, 3] | 
|[1, 5, 11]| 
|[1, 3, 3] | 
|[1, 4, 3] …
Run Code Online (Sandbox Code Playgroud)

user-defined-functions apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1434
查看次数

PySpark:计算行最小值忽略零和空值

我想根据数据框中现有的列子集创建一个新列(v5)。

示例数据框:

+---+---+---+---+
| v1| v2| v3| v4|
+---+---+---+---+
|  2|  4|7.0|4.0|
| 99|  0|2.0|0.0|
|189|  0|2.4|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

提供示例数据框的另一个视图:

+---+---+---+---+
| v1| v3| v2| v4|
+---+---+---+---+
|  2|7.0|  4|4.0|
| 99|2.0|  0|0.0|
|189|2.4|  0|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

它的创建者:

+---+---+---+---+
| v1| v2| v3| v4|
+---+---+---+---+
|  2|  4|7.0|4.0|
| 99|  0|2.0|0.0|
|189|  0|2.4|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)

最终,我想做的是创建另一个列 v5,它是与 v1 和 v2 的最小值相对应的值,忽略任一列中存在的零和空值。假设 v1 为键,v3 为值对。同样,v2 为键,v4 为值。例如,在第一行中:在 v1 和 v2 中,最小值属于 v1,即 2,因此 v5 列中的输出应为 7.0 同样,在第二行中:忽略 v1 和 v2 的零值和空值,输出应为成为2.0

原始数据帧有五列作为键,相应的五列作为值所需的输出:

+---+---+---+---+---+
| …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark apache-spark-1.6

1
推荐指数
1
解决办法
7136
查看次数

如何将 Spark 流输出转换为数据帧或存储在表中

我的代码是:

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()
Run Code Online (Sandbox Code Playgroud)

我的输出有 50 个不同的值,格式如下

{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}
Run Code Online (Sandbox Code Playgroud)

任何人都可以帮助我将这些数据存储在表格形式中

| id |date               |temp|press|   
|st01|26-02-2018 20:30:40| 30 |20   |  
|st01|26-02-2018 20:30:45| 80 |70   |  
Run Code Online (Sandbox Code Playgroud)

我会非常感激。

scala apache-spark spark-streaming apache-spark-sql

1
推荐指数
1
解决办法
5926
查看次数

Spark SQL - 自定义数据类型 UUID

我正在尝试使用 Spark SQL 中的自定义数据类型将数据集中的列从 varchar 转换为 UUID。但我看到转换没有发生。如果我在这里遗漏了什么,请告诉我。

val secdf = sc.parallelize( Array(("85d8b889-c793-4f23-93e9-ea18db640039","Revenue"), ("85d8b889-c793-4f23-93e9-ea18db640038","Income:123213"))).toDF("id", "report")
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString("database.column.type", "uuid")
metadataBuilder.putLong("jdbc.type", java.sql.Types.OTHER)
val metadata = metadataBuilder.build()
val secReportDF = secdf.withColumn("id", col("id").as("id", metadata))
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2022
查看次数