小编Kaf*_*els的帖子

Pyspark:具有重置条件的累积总和

我们有如下数据框:

+------+--------------------+
| Flag |               value|
+------+--------------------+
|1     |5                   |
|1     |4                   |
|1     |3                   |
|1     |5                   |
|1     |6                   |
|1     |4                   |
|1     |7                   |
|1     |5                   |
|1     |2                   |
|1     |3                   |
|1     |2                   |
|1     |6                   |
|1     |9                   |      
+------+--------------------+
Run Code Online (Sandbox Code Playgroud)

在正常 cumsum 之后,我们得到了这个。

+------+--------------------+----------+
| Flag |               value|cumsum    |
+------+--------------------+----------+
|1     |5                   |5         |
|1     |4                   |9         |
|1     |3                   |12        |
|1     |5                   |17        |
|1     |6                   |23        |
|1     |4                   |27 …
Run Code Online (Sandbox Code Playgroud)

cumulative-sum apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
1001
查看次数

Pyspark - 具有重置条件的累积和

我有这个数据框

+---+----+---+
|  A|   B|  C|
+---+----+---+
|  0|null|  1|
|  1| 3.0|  0|
|  2| 7.0|  0|
|  3|null|  1|
|  4| 4.0|  0|
|  5| 3.0|  0|
|  6|null|  1|
|  7|null|  1|
|  8|null|  1|
|  9| 5.0|  0|
| 10| 2.0|  0|
| 11|null|  1|
+---+----+---+
Run Code Online (Sandbox Code Playgroud)

我需要做的是从 C 列到下一个值为零的累积值总和。

预期输出:

+---+----+---+----+
|  A|   B|  C|   D|
+---+----+---+----+
|  0|null|  1|   1|
|  1| 3.0|  0|   0|
|  2| 7.0|  0|   0|
|  3|null|  1|   1|
| …
Run Code Online (Sandbox Code Playgroud)

python dataframe cumulative-sum apache-spark pyspark

6
推荐指数
1
解决办法
2126
查看次数

spark delta 覆盖特定分区

所以我有一个数据框,其中有一列 file_date。对于给定的运行,数据帧只有一个唯一的 file_date 的数据。例如,在一次运行中,假设有大约 100 条记录,文件日期为 2020_01_21。

我正在使用以下内容编写此数据

(df
 .repartition(1)
 .write
 .format("delta")
 .partitionBy("FILE_DATE")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .option("replaceWhere","FILE_DATE=" + run_for_file_date)
 .mode("overwrite")
 .save("/mnt/starsdetails/starsGrantedDetails/"))
Run Code Online (Sandbox Code Playgroud)

我的要求是为每个 FILE_DATE 创建一个文件夹/分区,因为很有可能重新运行特定 file_date 的数据并且必须覆盖特定 file_date 的数据。不幸的是,在上面的代码中,如果我不放置“replaceWhere”选项,它也只会覆盖其他分区的数据,但是如果我写了上面的内容,数据似乎正确地覆盖了特定分区,但是每次写入完成时,我收到以下错误。

请注意,我还在写入之前设置了以下 spark 配置:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
Run Code Online (Sandbox Code Playgroud)

但我仍然收到以下错误:

AnalysisException: "Data written out does not match replaceWhere 'FILE_DATE=2020-01-19'.\nInvalid data would be written to partitions FILE_DATE=2020-01-20.;"
Run Code Online (Sandbox Code Playgroud)

你能帮忙吗?

delta apache-spark

4
推荐指数
1
解决办法
2741
查看次数

Spark:写一个空值作为空列的 CSV

我正在使用 PySpark 将数据帧写入 CSV 文件,如下所示:

df.write.csv(PATH, nullValue='')
Run Code Online (Sandbox Code Playgroud)

该数据框中有一列字符串类型。一些值为空。这些空值显示如下:

...,"",...
Run Code Online (Sandbox Code Playgroud)

我希望它们像这样显示:

...,,...
Run Code Online (Sandbox Code Playgroud)

这可以通过csv.write() 中的选项实现吗?

谢谢!

csv apache-spark apache-spark-sql pyspark

3
推荐指数
1
解决办法
9074
查看次数

'replaceWhere' 会导致删除吗?

在这个命令(取自)中会replaceWhere导致记录删除吗?例如:命令中提到的日期范围有 1000 行。new df 只有100条,这样会导致删除900条记录吗?

df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") \
  .save("/mnt/delta/events")
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark databricks

2
推荐指数
1
解决办法
2663
查看次数

如何在pyspark中压缩/连接值和列表

我正在处理一个使用 4 个输入的函数。

为此,我想得到一个总结这 4 个元素的列表。但是我有两个变量,其中数据是唯一的,两个变量由列表组成。我可以用 压缩这两个列表arrays_zip,但无法获得包含 4 个元素的数组列表:

+----+----+---------+---------+
| l1 | l2 |   l3    |   l4    |
+----+----+---------+---------+
| 1  | 5  | [1,2,3] | [2,2,2] |
| 2  | 9  | [8,2,7] | [1,7,7] |
| 3  | 3  | [8,4,9] | [5,1,3] |
| 4  | 1  | [5,5,3] | [8,4,3] |

Run Code Online (Sandbox Code Playgroud)

我想得到什么:


+----+----+---------+---------+------------------------------------------+
| l1 | l2 |   l3    |   l4    |                       l5                 |
+----+----+---------+---------+------------------------------------------+
| 1  | 5  | [1,2,3] | [2,2,2] …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
54
查看次数

Discord.py 机器人的问题

我一直在尝试为我自己的服务器编写一个 Discord 机器人。然而,似乎每当我向代码中添加更多命令时,禁止和踢函数就不再正常工作。我多次尝试重写代码,但没有成功。我尝试过重新排列代码,但效果并不好。

client = commands.Bot(command_prefix = '!')

@client.command()
async def kick(ctx, member : discord.Member, *, reason = None):
  await member.kick(reason = reason)

@client.command()
async def ban(ctx, member : discord.Member, *, reason = None):
  await member.ban(reason = reason)


curseWord = ['die', 'kys', 'are you dumb', 'stfu', 'fuck you', 'nobody cares', 'do i care', 'bro shut up']

def get_quote():
  response = requests.get("https://zenquotes.io/api/random")
  json_data = json.loads(response.text)
  quote = json_data[0]['q'] + " -" + json_data[0]['a']
  return(quote)

#ready
@client.event
async def on_ready():
  print('LOGGED IN …
Run Code Online (Sandbox Code Playgroud)

bots discord discord.py

1
推荐指数
1
解决办法
103
查看次数