标签: apache-spark-sql

如何在 pyspark groupby 上将 UDF 与 pandas 一起使用?

我正在努力在 pyspark 上的 pandas 上使用 pandas UDF。您能帮我理解如何实现这一目标吗?以下是我的尝试:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
                   'B': [1, 2, 3],
                   'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
    return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
    return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
Run Code Online (Sandbox Code Playgroud)

这导致了我很难理解的异常:

AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-koalas

1
推荐指数
1
解决办法
3694
查看次数

Spark 将输出写回输入目录

我最近遇到一个场景,需要从目录中读取HDFS的输入

 /user/project/jsonFile
Run Code Online (Sandbox Code Playgroud)

并将结果写回同一目录:

 /user/project/jsonFile
Run Code Online (Sandbox Code Playgroud)

读取 jsonFile 后,执行多个连接,并将结果写入 /user/project/jsonFile 使用:

result.write().mode(SaveMode.Overwrite).json("/user/project/jsonFile");
Run Code Online (Sandbox Code Playgroud)

下面是我看到的错误:

[task-result-getter-0]o.a.s.s.TaskSetManager: Lost task 10.0 in stage 7.0 (TID 2508, hddev1db015dxc1.dev.oclc.org, executor 3): java.io.FileNotFoundException: File does not exist: /user/project/jsonFile
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:87)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:77)
    
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
Run Code Online (Sandbox Code Playgroud)

为什么它会抛出java.io.FileNotFoundException: File does not exist? result包含写回 HDFS 的联接输出的数据集,一旦result数据集可用,spark …

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
798
查看次数

Pyspark:滚动窗口中的聚合模式(最常见)值

我有一个如下所示的数据框。我想在每个组内进行分组device和排序start_time。然后,对于组中的每一行,从其前面 3 行(包括其自身)的窗口中获取最常出现的站点。

columns = ['device', 'start_time', 'station']
data = [("Python", 1, "station_1"), ("Python", 2, "station_2"), ("Python", 3, "station_1"), ("Python", 4, "station_2"), ("Python", 5, "station_2"), ("Python", 6, None)]


test_df = spark.createDataFrame(data).toDF(*columns)
rolling_w = Window.partitionBy('device').orderBy('start_time').rowsBetween(-2, 0)
Run Code Online (Sandbox Code Playgroud)

期望的输出:

columns = ['device', 'start_time', 'station']
data = [("Python", 1, "station_1"), ("Python", 2, "station_2"), ("Python", 3, "station_1"), ("Python", 4, "station_2"), ("Python", 5, "station_2"), ("Python", 6, None)]


test_df = spark.createDataFrame(data).toDF(*columns)
rolling_w = Window.partitionBy('device').orderBy('start_time').rowsBetween(-2, 0)
Run Code Online (Sandbox Code Playgroud)

由于 Pyspark 没有mode()函数,我知道如何获取静态中最常见的值,groupby如下所示, …

group-by apache-spark apache-spark-sql rolling-computation pyspark

1
推荐指数
1
解决办法
2657
查看次数

pyspark加入多个条件并删除两个重复列

我是 pandas 的 pyspark 新手。

当我这样做时,在一个条件下加入并删除重复项似乎效果很好:

df1.join(df2, df1.col1 == df2.col1, how="left").drop(df2.col1) 
Run Code Online (Sandbox Code Playgroud)

但是,如果我想加入两列条件并删除加入的 df bc 的两列,它是重复的,该怎么办。

我试过了:

df1.join(df2, [df1.col1 == df2.col1, df1.col2 == df2.col2, how="left").drop(df2.col1, df2.col2)
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1461
查看次数

AWS Glue (Spark) 非常慢

我继承了一些在 AWS Glue 上运行速度极其缓慢的代码。

在作业中,它创建了许多动态框架,然后使用spark.sql. 从 MySQL 和 Postgres 数据库读取表,然后使用 Glue 将它们连接在一起,最终将另一个表写回 Postgres。

示例(注意 dbs 等已被重命名和简化,因为我无法直接粘贴实际代码)

jobName = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(jobName, args)
    
# MySQL
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "trans").toDF().createOrReplaceTempView("trans")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "types").toDF().createOrReplaceTempView("types")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "currency").toDF().createOrReplaceTempView("currency")

# DB2 (Postgres)
glueContext.create_dynamic_frame.from_catalog(database = "db2", table_name = "watermark").toDF().createOrReplaceTempView("watermark")

# transactions
new_transactions_df = spark.sql("[SQL CODE HERE]")

# Write to DB
conf_g = glueContext.extract_jdbc_conf("My DB")
url …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark aws-glue

1
推荐指数
1
解决办法
7303
查看次数

在 SparkSQL 中,如何从嵌套结构中选择列的子集,并使用 SQL 语句将其保留为结果中的嵌套结构?

我可以在 SparkSQL 中执行以下语句:

result_df = spark.sql("""select
    one_field,
    field_with_struct
  from purchases""")
Run Code Online (Sandbox Code Playgroud)

生成的数据帧将具有完整结构的字段field_with_struct

一个字段 带结构的字段
123 {名称1,val1,val2,f2,f4}
第555章 {名称2,val3,val4,f6,f7}

我只想从 中选择几个字段field_with_struct,但将它们保留在结果数据框中的结构中。如果有可能(这不是真正的代码):

result_df = spark.sql("""select
    one_field,
    struct(
      field_with_struct.name,
      field_with_struct.value2
    ) as my_subset
  from purchases""")
Run Code Online (Sandbox Code Playgroud)

要得到这个:

一个字段 我的子集
123 {名称1,值2}
第555章 {名称2,值4}

有没有办法用 SQL 来做到这一点?(不适用于流畅的 API)

apache-spark-sql pyspark

1
推荐指数
1
解决办法
1001
查看次数

如何在spark中将多列内爆为一个结构

我有一个具有以下架构的 Spark 数据框:

  • 标头
  • 钥匙
  • ID
  • 时间戳
  • 度量值1
  • 度量值2

我想将多个列组合成一个结构,以便生成的模式变为:

  • 标题(列)
  • 键(列)
  • 值(结构)
    • 编号(列)
    • 时间戳(列)
    • 度量值1(列)
    • 度量值2(列)

我希望它成为这样的格式,以便它适合作为卡夫卡输入。请告诉如何实现这一目标。

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2584
查看次数

Spark 中将字符串拆分为字符数组

如何将字符串列拆分为字符数组?

输入:

from pyspark.sql import functions as F
df = spark.createDataFrame([('Vilnius',), ('Riga',), ('Tallinn',), ('New York',)], ['col_cities'])
df.show()
# +----------+
# |col_cities|
# +----------+
# |   Vilnius|
# |      Riga|
# |   Tallinn|
# |  New York|
# +----------+
Run Code Online (Sandbox Code Playgroud)

期望的输出:

# +----------+------------------------+
# |col_cities|split                   |
# +----------+------------------------+
# |Vilnius   |[V, i, l, n, i, u, s]   |
# |Riga      |[R, i, g, a]            |
# |Tallinn   |[T, a, l, l, i, n, n]   |
# |New York  |[N, e, w,  , …
Run Code Online (Sandbox Code Playgroud)

arrays split apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
2246
查看次数

Palantir Foundry - 文件和文件夹名称列表

我正在寻找一个代码来获取 pyspark 上 palantir 铸造厂目录中的文件和文件夹名称列表。因此,如果我提供铸造路径位置,我需要使用 pyspark 代码的文件和文件夹名称列表。

有人可以帮我解决这个问题吗?

file listdir apache-spark-sql pyspark palantir-foundry

1
推荐指数
1
解决办法
739
查看次数

Spark 窗口函数零偏斜

最近,我在运行 PySpark 作业之一时遇到了问题。在分析 Spark UI 中的阶段时,我注意到运行时间最长的阶段需要 1.2 小时才能运行完,而整个流程运行所需的总时间为 2.5 小时。

SparkUI 阶段选项卡按最长持续时间排序

查看阶段详细信息后,我清楚地发现我面临着严重的数据偏差,导致单个任务运行了整个1.2 小时,而所有其他任务在23 秒内完成。

任务分配显示出非常明显的偏差

总结显示了最长的任务与绝大多数人之间的巨大差异

DAG 显示此阶段涉及窗口函数,它帮助我快速将有问题的区域缩小到几个查询,并找到根本原因 -> 中account使用的列,Window.partitionBy("account")25% 的空值。我没有兴趣计算空帐户的总和,尽管我确实需要涉及的行进行进一步计算,因此我无法在窗口函数之前将它们过滤掉。

这是我的窗口函数查询:

problematic_account_window = Window.partitionBy("account")

sales_with_account_total_df = sales_df.withColumn("sum_sales_per_account", sum(col("price")).over(problematic_account_window))
Run Code Online (Sandbox Code Playgroud)

所以我们找到了罪魁祸首——我们现在能做什么?我们如何解决倾斜和性能问题?

skew apache-spark apache-spark-sql pyspark spark-window-function

1
推荐指数
1
解决办法
828
查看次数