我正在努力在 pyspark 上的 pandas 上使用 pandas UDF。您能帮我理解如何实现这一目标吗?以下是我的尝试:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
Run Code Online (Sandbox Code Playgroud)
这导致了我很难理解的异常:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by …Run Code Online (Sandbox Code Playgroud) 我最近遇到一个场景,需要从目录中读取HDFS的输入
/user/project/jsonFile
Run Code Online (Sandbox Code Playgroud)
并将结果写回同一目录:
/user/project/jsonFile
Run Code Online (Sandbox Code Playgroud)
读取 jsonFile 后,执行多个连接,并将结果写入 /user/project/jsonFile 使用:
result.write().mode(SaveMode.Overwrite).json("/user/project/jsonFile");
Run Code Online (Sandbox Code Playgroud)
下面是我看到的错误:
[task-result-getter-0]o.a.s.s.TaskSetManager: Lost task 10.0 in stage 7.0 (TID 2508, hddev1db015dxc1.dev.oclc.org, executor 3): java.io.FileNotFoundException: File does not exist: /user/project/jsonFile
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:87)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:77)
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
Run Code Online (Sandbox Code Playgroud)
为什么它会抛出java.io.FileNotFoundException: File does not exist? result包含写回 HDFS 的联接输出的数据集,一旦result数据集可用,spark …
我有一个如下所示的数据框。我想在每个组内进行分组device和排序start_time。然后,对于组中的每一行,从其前面 3 行(包括其自身)的窗口中获取最常出现的站点。
columns = ['device', 'start_time', 'station']
data = [("Python", 1, "station_1"), ("Python", 2, "station_2"), ("Python", 3, "station_1"), ("Python", 4, "station_2"), ("Python", 5, "station_2"), ("Python", 6, None)]
test_df = spark.createDataFrame(data).toDF(*columns)
rolling_w = Window.partitionBy('device').orderBy('start_time').rowsBetween(-2, 0)
Run Code Online (Sandbox Code Playgroud)
期望的输出:
columns = ['device', 'start_time', 'station']
data = [("Python", 1, "station_1"), ("Python", 2, "station_2"), ("Python", 3, "station_1"), ("Python", 4, "station_2"), ("Python", 5, "station_2"), ("Python", 6, None)]
test_df = spark.createDataFrame(data).toDF(*columns)
rolling_w = Window.partitionBy('device').orderBy('start_time').rowsBetween(-2, 0)
Run Code Online (Sandbox Code Playgroud)
由于 Pyspark 没有mode()函数,我知道如何获取静态中最常见的值,groupby如下所示, …
group-by apache-spark apache-spark-sql rolling-computation pyspark
我是 pandas 的 pyspark 新手。
当我这样做时,在一个条件下加入并删除重复项似乎效果很好:
df1.join(df2, df1.col1 == df2.col1, how="left").drop(df2.col1)
Run Code Online (Sandbox Code Playgroud)
但是,如果我想加入两列条件并删除加入的 df bc 的两列,它是重复的,该怎么办。
我试过了:
df1.join(df2, [df1.col1 == df2.col1, df1.col2 == df2.col2, how="left").drop(df2.col1, df2.col2)
Run Code Online (Sandbox Code Playgroud) 我继承了一些在 AWS Glue 上运行速度极其缓慢的代码。
在作业中,它创建了许多动态框架,然后使用spark.sql. 从 MySQL 和 Postgres 数据库读取表,然后使用 Glue 将它们连接在一起,最终将另一个表写回 Postgres。
示例(注意 dbs 等已被重命名和简化,因为我无法直接粘贴实际代码)
jobName = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(jobName, args)
# MySQL
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "trans").toDF().createOrReplaceTempView("trans")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "types").toDF().createOrReplaceTempView("types")
glueContext.create_dynamic_frame.from_catalog(database = "db1", table_name = "currency").toDF().createOrReplaceTempView("currency")
# DB2 (Postgres)
glueContext.create_dynamic_frame.from_catalog(database = "db2", table_name = "watermark").toDF().createOrReplaceTempView("watermark")
# transactions
new_transactions_df = spark.sql("[SQL CODE HERE]")
# Write to DB
conf_g = glueContext.extract_jdbc_conf("My DB")
url …Run Code Online (Sandbox Code Playgroud) 我可以在 SparkSQL 中执行以下语句:
result_df = spark.sql("""select
one_field,
field_with_struct
from purchases""")
Run Code Online (Sandbox Code Playgroud)
生成的数据帧将具有完整结构的字段field_with_struct。
| 一个字段 | 带结构的字段 |
|---|---|
| 123 | {名称1,val1,val2,f2,f4} |
| 第555章 | {名称2,val3,val4,f6,f7} |
我只想从 中选择几个字段field_with_struct,但将它们保留在结果数据框中的结构中。如果有可能(这不是真正的代码):
result_df = spark.sql("""select
one_field,
struct(
field_with_struct.name,
field_with_struct.value2
) as my_subset
from purchases""")
Run Code Online (Sandbox Code Playgroud)
要得到这个:
| 一个字段 | 我的子集 |
|---|---|
| 123 | {名称1,值2} |
| 第555章 | {名称2,值4} |
有没有办法用 SQL 来做到这一点?(不适用于流畅的 API)
我有一个具有以下架构的 Spark 数据框:
我想将多个列组合成一个结构,以便生成的模式变为:
我希望它成为这样的格式,以便它适合作为卡夫卡输入。请告诉如何实现这一目标。
如何将字符串列拆分为字符数组?
输入:
from pyspark.sql import functions as F
df = spark.createDataFrame([('Vilnius',), ('Riga',), ('Tallinn',), ('New York',)], ['col_cities'])
df.show()
# +----------+
# |col_cities|
# +----------+
# | Vilnius|
# | Riga|
# | Tallinn|
# | New York|
# +----------+
Run Code Online (Sandbox Code Playgroud)
期望的输出:
# +----------+------------------------+
# |col_cities|split |
# +----------+------------------------+
# |Vilnius |[V, i, l, n, i, u, s] |
# |Riga |[R, i, g, a] |
# |Tallinn |[T, a, l, l, i, n, n] |
# |New York |[N, e, w, , …Run Code Online (Sandbox Code Playgroud) 我正在寻找一个代码来获取 pyspark 上 palantir 铸造厂目录中的文件和文件夹名称列表。因此,如果我提供铸造路径位置,我需要使用 pyspark 代码的文件和文件夹名称列表。
有人可以帮我解决这个问题吗?
最近,我在运行 PySpark 作业之一时遇到了问题。在分析 Spark UI 中的阶段时,我注意到运行时间最长的阶段需要 1.2 小时才能运行完,而整个流程运行所需的总时间为 2.5 小时。
查看阶段详细信息后,我清楚地发现我面临着严重的数据偏差,导致单个任务运行了整个1.2 小时,而所有其他任务在23 秒内完成。
DAG 显示此阶段涉及窗口函数,它帮助我快速将有问题的区域缩小到几个查询,并找到根本原因 -> 中account使用的列,Window.partitionBy("account")有25% 的空值。我没有兴趣计算空帐户的总和,尽管我确实需要涉及的行进行进一步计算,因此我无法在窗口函数之前将它们过滤掉。
这是我的窗口函数查询:
problematic_account_window = Window.partitionBy("account")
sales_with_account_total_df = sales_df.withColumn("sum_sales_per_account", sum(col("price")).over(problematic_account_window))
Run Code Online (Sandbox Code Playgroud)
所以我们找到了罪魁祸首——我们现在能做什么?我们如何解决倾斜和性能问题?
skew apache-spark apache-spark-sql pyspark spark-window-function
apache-spark-sql ×10
apache-spark ×8
pyspark ×8
arrays ×1
aws-glue ×1
file ×1
group-by ×1
listdir ×1
python ×1
skew ×1
spark-koalas ×1
split ×1