我有一个具有下一个架构的数据框:
root
|-- id_1: long (nullable = true)
|-- id_2: long (nullable = true)
|-- score: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
数据如下:
+----+----+------------------+
|id_1|id_2|score |
+----+----+------------------+
|0 |9 |0.5888888888888889|
|0 |1 |0.6166666666666667|
|0 |2 |0.496996996996997 |
|1 |9 |0.6222222222222221|
|1 |6 |0.9082996632996633|
|1 |5 |0.5927450980392157|
|2 |3 |0.665774107440774 |
|3 |8 |0.6872367465504721|
|3 |8 |0.6872367465504721|
|5 |6 |0.5365909090909091|
+----+----+------------------+
Run Code Online (Sandbox Code Playgroud)
目标是为每个 id_1 找到具有最大得分 的 id_2。也许我错了,但是......只需要创建配对的 RDD:
root
|-- _1: long (nullable = true)
|-- _2: struct (nullable = true)
| …Run Code Online (Sandbox Code Playgroud) 我试过下面的代码
审判1
…………
val df2=sqlContext.sql("select concat(' ',Id,LabelName) as 'first last' from p1 order by LabelName desc ");
Run Code Online (Sandbox Code Playgroud)
审判-2
…………
val df2=sqlContext.sql("select concat(' ',Id,LabelName) from p1 order by LabelName desc ");
val df3=df2.toDF("first last")
Run Code Online (Sandbox Code Playgroud)
当我尝试运行它时,Trial-1 抛出错误......但在 Trial-2 中,它正在接受命令,但当我执行以下操作时抛出错误
scala> df3.write.parquet("/prashanth/a1")
Run Code Online (Sandbox Code Playgroud) 我有一个包含几行的文件。例如
A B C
awer.ttp.net Code 554
abcd.ttp.net Code 747
asdf.ttp.net Part 554
xyz.ttp.net Part 747
Run Code Online (Sandbox Code Playgroud)
我想要创建一个 SparkSQL 语句来仅拆分表的 a 列,并且希望向表 D 添加一个新行,其值为 awe、abcd、asdf 和 xyz。
如何使用 where 子句更新 Pyspark 数据框中的列?
这类似于此 SQL 操作:
UPDATE table1 SET alpha1= x WHERE alpha2< 6;
Run Code Online (Sandbox Code Playgroud)
其中 alpha1 和 alpha2 是 table1 的列。
例如:我有一个数据框 table1,其值如下:
表格1 阿尔法1 阿尔法2 3 7 4 5 5 4 6 8 更新后的数据框表1: 阿尔法1 阿尔法2 3 7 x 5 x 4 6 8
如何在 pyspark 数据框中执行此操作?
我已按以下方式格式化数据帧中的时间戳。
var df_v_5 = df_v_4..withColumn("endTimeFormat",
from_unixtime(unix_timestamp('DateTime), "dd-MM-yyyy hh:mm:ss"))
Run Code Online (Sandbox Code Playgroud)
我得到的输出为
DateTime,value1,value2,endTimeFormat
2017-01-01T12:00:00.000+05:30,11,-14,01-01-2017 12:00:00
2017-01-01T13:00:00.000+05:30,110,13,01-01-2017 01:00:00
Run Code Online (Sandbox Code Playgroud)
预期输出:
DateTime,value1,value2,endTimeFormat
2017-01-01T12:00:00.000+05:30,11,-14,01-01-2017 12:00:00
2017-01-01T13:00:00.000+05:30,110,13,01-01-2017 13:00:00
Run Code Online (Sandbox Code Playgroud)
如何将此时间戳转换为 24 小时格式?
我有一个巨大的 Spark DataFrame,我使用以下语句创建它
val df = sqlContext.read.option("mergeSchema", "true").parquet("parquet/partitions/path")
Run Code Online (Sandbox Code Playgroud)
现在,当我尝试在上面的 DataFrame 上执行列重命名或选择操作时,它失败说发现了不明确的列,但出现以下异常
org.apache.spark.sql.AnalysisException:引用“Product_Type”不明确,可能是 Product_Type#13、Product_Type#235
现在我看到列,发现有两列Product_Type,Product_type它们似乎是相同的列,但由于随着时间的推移模式合并而创建了一个字母大小写不同的列。现在我不介意保留重复的列,但 Spark sqlContext 由于某种原因不喜欢它。
我相信默认spark.sql.caseSensitive配置是正确的,所以不知道为什么会失败。我正在使用 Spark 1.5.2。我是 Spark 新手。
我有以下数据框:
+----------+
|col |
+----------+
|[1, 4, 3] |
|[1, 5, 11]|
|[1, 3, 3] |
|[1, 4, 3] |
|[1, 6, 3] |
|[1, 1, 3] |
+----------+
Run Code Online (Sandbox Code Playgroud)
我想要的是:
+----------+
|col_new |
+----------+
|[3, -1] |
|[4, 6] |
|[2, 0] |
|[3, -1] |
|[5, -3] |
|[0, 2] |
+----------+
Run Code Online (Sandbox Code Playgroud)
=> 差异运算符 arr[n+1] - arr[n]
而且我不知道该怎么做。
我想我应该用 udf 来做?我不太熟悉它,但是我尝试过。
+----------+
|col |
+----------+
|[1, 4, 3] |
|[1, 5, 11]|
|[1, 3, 3] |
|[1, 4, 3] …Run Code Online (Sandbox Code Playgroud) user-defined-functions apache-spark apache-spark-sql pyspark
我想根据数据框中现有的列子集创建一个新列(v5)。
示例数据框:
+---+---+---+---+
| v1| v2| v3| v4|
+---+---+---+---+
| 2| 4|7.0|4.0|
| 99| 0|2.0|0.0|
|189| 0|2.4|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
提供示例数据框的另一个视图:
+---+---+---+---+
| v1| v3| v2| v4|
+---+---+---+---+
| 2|7.0| 4|4.0|
| 99|2.0| 0|0.0|
|189|2.4| 0|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
它的创建者:
+---+---+---+---+
| v1| v2| v3| v4|
+---+---+---+---+
| 2| 4|7.0|4.0|
| 99| 0|2.0|0.0|
|189| 0|2.4|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
最终,我想做的是创建另一个列 v5,它是与 v1 和 v2 的最小值相对应的值,忽略任一列中存在的零和空值。假设 v1 为键,v3 为值对。同样,v2 为键,v4 为值。例如,在第一行中:在 v1 和 v2 中,最小值属于 v1,即 2,因此 v5 列中的输出应为 7.0 同样,在第二行中:忽略 v1 和 v2 的零值和空值,输出应为成为2.0
原始数据帧有五列作为键,相应的五列作为值所需的输出:
+---+---+---+---+---+
| …Run Code Online (Sandbox Code Playgroud) 我的代码是:
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()
Run Code Online (Sandbox Code Playgroud)
我的输出有 50 个不同的值,格式如下
{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}
Run Code Online (Sandbox Code Playgroud)
任何人都可以帮助我将这些数据存储在表格形式中
| id |date |temp|press|
|st01|26-02-2018 20:30:40| 30 |20 |
|st01|26-02-2018 20:30:45| 80 |70 |
Run Code Online (Sandbox Code Playgroud)
我会非常感激。
我正在尝试使用 Spark SQL 中的自定义数据类型将数据集中的列从 varchar 转换为 UUID。但我看到转换没有发生。如果我在这里遗漏了什么,请告诉我。
val secdf = sc.parallelize( Array(("85d8b889-c793-4f23-93e9-ea18db640039","Revenue"), ("85d8b889-c793-4f23-93e9-ea18db640038","Income:123213"))).toDF("id", "report")
val metadataBuilder = new MetadataBuilder()
metadataBuilder.putString("database.column.type", "uuid")
metadataBuilder.putLong("jdbc.type", java.sql.Types.OTHER)
val metadata = metadataBuilder.build()
val secReportDF = secdf.withColumn("id", col("id").as("id", metadata))
Run Code Online (Sandbox Code Playgroud) apache-spark-sql ×10
apache-spark ×9
pyspark ×4
scala ×4
dataframe ×2
bigdata ×1
datetime ×1
python ×1
sql ×1