小编abe*_*bop的帖子

PySpark group中的中位数/分位数

我想在Spark数据帧上计算组分位数(使用PySpark).近似或精确的结果都可以.我更喜欢在groupBy/ 的上下文中使用的解决方案agg,以便我可以将它与其他PySpark聚合函数混合使用.如果由于某种原因这是不可能的,那么不同的方法也可以.

这个问题是相关的,但没有说明如何approxQuantile用作聚合函数.

我也可以访问percentile_approxHive UDF,但我不知道如何将它用作聚合函数.

为了特异性,假设我有以下数据帧:

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()    

df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
Run Code Online (Sandbox Code Playgroud)

预期结果是:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

28
推荐指数
5
解决办法
2万
查看次数

来自AsyncTask的RejectedExecutionException,但没有达到限制

我的Android应用程序必须处理到达的消息,这些消息经常串联(特别是在片状连接期间).我在AsyncTasks中处理这些传入的消息,这样我就不会干扰UI线程.如果一次收到太多消息,我会得到一个RejectedExecutionException.我的错误堆栈如下所示:

10-22 14:44:49.398: E/AndroidRuntime(17834): Caused by: java.util.concurrent.RejectedExecutionException: Task android.os.AsyncTask$3@414cbe68 rejected from java.util.concurrent.ThreadPoolExecutor@412716b8[Running, pool size = 128, active threads = 22, queued tasks = 0, completed tasks = 1323]
10-22 14:44:49.398: E/AndroidRuntime(17834):    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1967)
10-22 14:44:49.398: E/AndroidRuntime(17834):    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:782)
10-22 14:44:49.398: E/AndroidRuntime(17834):    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1303)
10-22 14:44:49.398: E/AndroidRuntime(17834):    at android.os.AsyncTask.executeOnExecutor(AsyncTask.java:564)
Run Code Online (Sandbox Code Playgroud)

我正在运行任务,task.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR)以便并行处理传入的消息.

令人困惑的是,与我能找到的相关StackOverflow问题(例如此处此处)不同的是,活动线程和排队任务的数量似乎没有超出限制(似乎是128)和10,分别).查看stacktrace:

ThreadPoolExecutor@412716b8[Running, pool size = 128, active threads = 22, queued tasks = 0, completed tasks = 1323]

为什么我会收到此错误消息?

multithreading android

17
推荐指数
2
解决办法
2万
查看次数

在子字符串匹配(或包含)时加入 PySpark 数据帧

我想在两个数据帧之间执行左连接,但列不完全相同。第一个数据帧中的连接列相对于第二个数据帧有一个额外的后缀。

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()

df1 = sc.parallelize([
    ['AB-101-1', 'el1', 1.5],
    ['ABC-1020-1', 'el2', 1.3],
    ['AC-1030-1', 'el3', 8.5]
]).toDF(('id1', 'el', 'v1'))

df2 = sc.parallelize([
    ['AB-101', 3],
    ['ABC-1020', 4]
]).toDF(('id2', 'v2'))
Run Code Online (Sandbox Code Playgroud)

我想通过左连接获得的数据帧是:

df_join = sc.parallelize([
    ['AB-101-1', 'el1', 1.5, 'AB-101', 3],
    ['ABC-1020-1', 'el2', 1.3, 'ABC-1020', 4],
    ['AC-103-1', 'el3', 8.5, None, None]
]).toDF(('id1', 'el', 'v1', 'id2', 'v2'))
Run Code Online (Sandbox Code Playgroud)

我很乐意使用pyspark.sql.substring“除最后两个字符之外的所有字符”,或者使用类似的东西pyspark.sql.like,但我不知道如何使这些中的任何一个在连接内正常工作。

pyspark

6
推荐指数
2
解决办法
1万
查看次数

如何为 Jupyter notebook 中的每个单元启用计时魔法?

%%time%%timeit魔法使得能够在Jupyter或IPython的笔记本单个小区的定时。

是否有类似的功能可以为 Jupyter 笔记本中的每个单元打开和关闭计时?

这个问题是相关的,但对于在每个单元格中自动启用给定魔法的更普遍的问题没有答案。

python ipython-notebook jupyter-notebook

5
推荐指数
1
解决办法
1168
查看次数

为什么我的 Airflow 任务“从外部设置为失败”?

我正在使用 Airflow 2.0.0,并且我的任务在运行几秒钟或几分钟后偶尔会被“外部”杀死。这些任务通常会成功运行(对于通过airflow tasks test ...DAG启动的手动任务和计划的 DAG 运行),所以我相信这与我的 DAG 代码无关。

当任务失败时,这似乎是任务日志中的关键错误:

{local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
Run Code Online (Sandbox Code Playgroud)
[2020-12-20 11:26:11,448] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:1017} INFO - 
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,473] {taskinstance.py:1018} INFO - Starting attempt 3 of 3
[2020-12-20 11:26:11,473] {taskinstance.py:1019} INFO - 
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,506] {taskinstance.py:1038} INFO …
Run Code Online (Sandbox Code Playgroud)

airflow

5
推荐指数
1
解决办法
2620
查看次数

SQL中的模糊分组

我需要修改一个SQL表来对名称略有不匹配的名称进行分组,并为该组中的所有元素分配一个标准名称.

例如,如果初始表如下所示:

Name
--------
Jon Q
John Q
Jonn Q
Mary W
Marie W
Matt H
Run Code Online (Sandbox Code Playgroud)

我想创建一个新表或向现有的表添加一个字段,如下所示:

Name     | StdName
--------------------
Jon Q    | Jon Q
John Q   | Jon Q
Jonn Q   | Jon Q
Mary W   | Mary W
Marie W  | Mary W
Matt H   | Matt H
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我选择了第一个名称作为"标准化名称",但我实际上并不关心选择哪一个 - 最终将最终的"标准名称"转换为唯一的人ID.(我也对可以直接使用数字ID的替代解决方案持开放态度.)我也会有生日匹配,所以名称匹配的准确性实际上并不需要在实践中精确.我对此进行了一些调查,可能会使用Jaro-Winkler算法(参见此处).

如果我知道名称都是成对的,这将是一个相对容易的查询,但可以有任意数量的相同名称.

我可以很容易地概念化如何使用过程语言进行此查询,但我对SQL不是很熟悉.不幸的是,我无法直接访问数据 - 它是敏感数据,因此其他人(官僚)必须为我运行实际查询.具体实现将是SQL Server,但我更喜欢与实现无关的解决方案.

编辑:

在回应评论时,我考虑了以下程序方法.它是在Python中,为了拥有一个有效的代码示例,我在名称的第一个字母上简单地匹配了Jaro-Winkler.

nameList = ['Jon Q', 'John Q', 'Jonn Q', 'Mary W', 'Marie W', 'Larry H']
stdList = nameList[:] …
Run Code Online (Sandbox Code Playgroud)

sql sql-server

4
推荐指数
2
解决办法
5438
查看次数

如何在PySpark DataFrame中强制进行某个分区?

假设我有一个带有列的DataFrame partition_id:

n_partitions = 2

df = spark.sparkContext.parallelize([
    [1, 'A'],
    [1, 'B'],
    [2, 'A'],
    [2, 'C']
]).toDF(('partition_id', 'val'))
Run Code Online (Sandbox Code Playgroud)

我如何重新分区DataFrame以保证每个值partition_id都转到一个单独的分区,并且实际分区的数量与不同的值完全相同partition_id

如果我执行散列分区,即df.repartition(n_partitions, 'partition_id')保证分区数量正确,但某些分区可能为空,而其他分区可能包含多个partition_id由于散列冲突引起的值.

partitioning apache-spark pyspark

4
推荐指数
2
解决办法
2596
查看次数