我想在Spark数据帧上计算组分位数(使用PySpark).近似或精确的结果都可以.我更喜欢在groupBy/ 的上下文中使用的解决方案agg,以便我可以将它与其他PySpark聚合函数混合使用.如果由于某种原因这是不可能的,那么不同的方法也可以.
这个问题是相关的,但没有说明如何approxQuantile用作聚合函数.
我也可以访问percentile_approxHive UDF,但我不知道如何将它用作聚合函数.
为了特异性,假设我有以下数据帧:
from pyspark import SparkContext
import pyspark.sql.functions as f
sc = SparkContext()
df = sc.parallelize([
['A', 1],
['A', 2],
['A', 3],
['B', 4],
['B', 5],
['B', 6],
]).toDF(('grp', 'val'))
df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
Run Code Online (Sandbox Code Playgroud)
预期结果是:
+----+-------+
| grp|med_val|
+----+-------+
| A| 2|
| B| 5|
+----+-------+
Run Code Online (Sandbox Code Playgroud) 我的Android应用程序必须处理到达的消息,这些消息经常串联(特别是在片状连接期间).我在AsyncTasks中处理这些传入的消息,这样我就不会干扰UI线程.如果一次收到太多消息,我会得到一个RejectedExecutionException.我的错误堆栈如下所示:
10-22 14:44:49.398: E/AndroidRuntime(17834): Caused by: java.util.concurrent.RejectedExecutionException: Task android.os.AsyncTask$3@414cbe68 rejected from java.util.concurrent.ThreadPoolExecutor@412716b8[Running, pool size = 128, active threads = 22, queued tasks = 0, completed tasks = 1323]
10-22 14:44:49.398: E/AndroidRuntime(17834): at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1967)
10-22 14:44:49.398: E/AndroidRuntime(17834): at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:782)
10-22 14:44:49.398: E/AndroidRuntime(17834): at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1303)
10-22 14:44:49.398: E/AndroidRuntime(17834): at android.os.AsyncTask.executeOnExecutor(AsyncTask.java:564)
Run Code Online (Sandbox Code Playgroud)
我正在运行任务,task.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR)以便并行处理传入的消息.
令人困惑的是,与我能找到的相关StackOverflow问题(例如此处和此处)不同的是,活动线程和排队任务的数量似乎没有超出限制(似乎是128)和10,分别).查看stacktrace:
ThreadPoolExecutor@412716b8[Running, pool size = 128, active threads = 22, queued tasks = 0, completed tasks = 1323]
为什么我会收到此错误消息?
我想在两个数据帧之间执行左连接,但列不完全相同。第一个数据帧中的连接列相对于第二个数据帧有一个额外的后缀。
from pyspark import SparkContext
import pyspark.sql.functions as f
sc = SparkContext()
df1 = sc.parallelize([
['AB-101-1', 'el1', 1.5],
['ABC-1020-1', 'el2', 1.3],
['AC-1030-1', 'el3', 8.5]
]).toDF(('id1', 'el', 'v1'))
df2 = sc.parallelize([
['AB-101', 3],
['ABC-1020', 4]
]).toDF(('id2', 'v2'))
Run Code Online (Sandbox Code Playgroud)
我想通过左连接获得的数据帧是:
df_join = sc.parallelize([
['AB-101-1', 'el1', 1.5, 'AB-101', 3],
['ABC-1020-1', 'el2', 1.3, 'ABC-1020', 4],
['AC-103-1', 'el3', 8.5, None, None]
]).toDF(('id1', 'el', 'v1', 'id2', 'v2'))
Run Code Online (Sandbox Code Playgroud)
我很乐意使用pyspark.sql.substring“除最后两个字符之外的所有字符”,或者使用类似的东西pyspark.sql.like,但我不知道如何使这些中的任何一个在连接内正常工作。
的%%time和%%timeit魔法使得能够在Jupyter或IPython的笔记本单个小区的定时。
是否有类似的功能可以为 Jupyter 笔记本中的每个单元打开和关闭计时?
这个问题是相关的,但对于在每个单元格中自动启用给定魔法的更普遍的问题没有答案。
我正在使用 Airflow 2.0.0,并且我的任务在运行几秒钟或几分钟后偶尔会被“外部”杀死。这些任务通常会成功运行(对于通过airflow tasks test ...DAG启动的手动任务和计划的 DAG 运行),所以我相信这与我的 DAG 代码无关。
当任务失败时,这似乎是任务日志中的关键错误:
{local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
Run Code Online (Sandbox Code Playgroud)
[2020-12-20 11:26:11,448] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:1017} INFO -
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,473] {taskinstance.py:1018} INFO - Starting attempt 3 of 3
[2020-12-20 11:26:11,473] {taskinstance.py:1019} INFO -
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,506] {taskinstance.py:1038} INFO …Run Code Online (Sandbox Code Playgroud) 我需要修改一个SQL表来对名称略有不匹配的名称进行分组,并为该组中的所有元素分配一个标准名称.
例如,如果初始表如下所示:
Name
--------
Jon Q
John Q
Jonn Q
Mary W
Marie W
Matt H
Run Code Online (Sandbox Code Playgroud)
我想创建一个新表或向现有的表添加一个字段,如下所示:
Name | StdName
--------------------
Jon Q | Jon Q
John Q | Jon Q
Jonn Q | Jon Q
Mary W | Mary W
Marie W | Mary W
Matt H | Matt H
Run Code Online (Sandbox Code Playgroud)
在这种情况下,我选择了第一个名称作为"标准化名称",但我实际上并不关心选择哪一个 - 最终将最终的"标准名称"转换为唯一的人ID.(我也对可以直接使用数字ID的替代解决方案持开放态度.)我也会有生日匹配,所以名称匹配的准确性实际上并不需要在实践中精确.我对此进行了一些调查,可能会使用Jaro-Winkler算法(参见此处).
如果我知道名称都是成对的,这将是一个相对容易的查询,但可以有任意数量的相同名称.
我可以很容易地概念化如何使用过程语言进行此查询,但我对SQL不是很熟悉.不幸的是,我无法直接访问数据 - 它是敏感数据,因此其他人(官僚)必须为我运行实际查询.具体实现将是SQL Server,但我更喜欢与实现无关的解决方案.
编辑:
在回应评论时,我考虑了以下程序方法.它是在Python中,为了拥有一个有效的代码示例,我在名称的第一个字母上简单地匹配了Jaro-Winkler.
nameList = ['Jon Q', 'John Q', 'Jonn Q', 'Mary W', 'Marie W', 'Larry H']
stdList = nameList[:] …Run Code Online (Sandbox Code Playgroud) 假设我有一个带有列的DataFrame partition_id:
n_partitions = 2
df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
Run Code Online (Sandbox Code Playgroud)
我如何重新分区DataFrame以保证每个值partition_id都转到一个单独的分区,并且实际分区的数量与不同的值完全相同partition_id?
如果我执行散列分区,即df.repartition(n_partitions, 'partition_id')保证分区数量正确,但某些分区可能为空,而其他分区可能包含多个partition_id由于散列冲突引起的值.
pyspark ×3
apache-spark ×2
airflow ×1
android ×1
partitioning ×1
pyspark-sql ×1
python ×1
sql ×1
sql-server ×1