Sri*_*nka 3 apache-spark-sql pyspark
我有一个包含 10609 行的数据框,我想一次将 100 行转换为 JSON 并将它们发送回网络服务。
我曾尝试使用 SQL 的 LIMIT 子句,例如
temptable = spark.sql("select item_code_1 from join_table limit 100")
Run Code Online (Sandbox Code Playgroud)
这将返回前 100 行,但如果我想要接下来的 100 行,我试过这个但没有用。
temptable = spark.sql("select item_code_1 from join_table limit 100, 200")
Run Code Online (Sandbox Code Playgroud)
错误:Py4JJavaError:调用 o22.sql 时发生错误。: org.apache.spark.sql.catalyst.parser.ParseException: 不匹配的输入 ',' 期望(第 1 行,位置 44)
== SQL ==
select item_code_1 from join_table limit 100, 200
Run Code Online (Sandbox Code Playgroud)
您必须创建一个行号列,它将序列号分配给列,并使用该列通过过滤器获取范围内的数据。
df = spark.createDataFrame([('a',),
('b',),
('c',),
('d',),
('e',)
],'item : string')
df.show()
#+----+
#|item|
#+----+
#| a|
#| b|
#| c|
#| d|
#| e|
#+----+
Run Code Online (Sandbox Code Playgroud)
我正在使用一个虚拟静态列lit('a')来生成 row_num。请根据您的实际数据更新以下逻辑(这会生成 row_num)。
partitionBy(lit('a')).orderBy(lit('a')
Run Code Online (Sandbox Code Playgroud)
数据框示例-
from pyspark.sql.functions import lit,row_number,col
from pyspark.sql.window import Window
w = Window().partitionBy(lit('a')).orderBy(lit('a'))
df1 = df.withColumn("row_num", row_number().over(w))
df1.filter(col("row_num").between(1,2)).show()
#+----+-------+
#|item|row_num|
#+----+-------+
#| a| 1|
#| b| 2|
#+----+-------+
df1.filter(col("row_num").between(3,4)).show()
#+----+-------+
#|item|row_num|
#+----+-------+
#| c| 3|
#| d| 4|
#+----+-------+
Run Code Online (Sandbox Code Playgroud)
Spark SQL 示例-
df1.createOrReplaceTempView("dfTable")
spark.sql("SELECT * FROM dfTable WHERE row_num between 1 and 2").show()
#+----+-------+
#|item|row_num|
#+----+-------+
#| a| 1|
#| b| 2|
#+----+-------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
18278 次 |
| 最近记录: |