我们有一个 Blob 存储,全天都会有大量文件到达。我有一个批量运行的 Databricks 笔记本,读取目录列表,循环文件并将它们全部发送到 Azure SQLDW.Works 中。之后,处理后的文件将被移至存档中。但是循环文件列表、附加每个文件并将文件名添加到列的过程有点慢。我想知道这是否可以在 1 次运行中完成。可以一次加载所有 csv,但如何记住一列中相应的文件名。
有人有建议吗?
我有一个包含一些单词的列表,我需要从文本行中提取匹配的单词,我找到了这个,但它只提取了一个单词。
密钥文件内容
这是一个关键字
部分描述文件内容
32015 这是一个关键字 hello world
代码
import pyspark.sql.functions as F
keywords = sc.textFile('file:///home/description_search/keys') #1
part_description = sc.textFile('file:///description_search/part_description') #2
keywords = keywords.map(lambda x: x.split(' ')) #3
keywords = keywords.collect()[0] #4
df = part_description.map(lambda r: Row(r)).toDF(['line']) #5
df.withColumn('extracted_word', F.regexp_extract(df['line'],'|'.join(keywords), 0)).show() #6
Run Code Online (Sandbox Code Playgroud)
输出
+--------------------+--------------+
| line|extracted_word|
+--------------------+--------------+
|32015 this is a...| this|
+--------------------+--------------+
Run Code Online (Sandbox Code Playgroud)
预期产出
+--------------------+-----------------+
| line| extracted_word|
+--------------------+-----------------+
|32015 this is a...|this,is,a,keyword|
+--------------------+-----------------+
Run Code Online (Sandbox Code Playgroud)
我想要
返回所有匹配的关键字及其计数
ifstep #4是最有效的方法
可重现的例子:
+--------------------+--------------+
| line|extracted_word|
+--------------------+--------------+
|32015 this is …Run Code Online (Sandbox Code Playgroud) 我有一个包含 432 列的数据框,并且有 24 个重复列。
我想删除 df_tickets 中重复的列。所以 df_tickets 应该只有 432-24=408 列。
我已经用下面的代码尝试过,但它抛出错误。
df_tickets.select([c for c in df_tickets.columns if c not in duplicatecols]).show()
Run Code Online (Sandbox Code Playgroud)
错误是
An error occurred while calling o1657.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
HashAggregate(keys=[ms_bvoip_order_extension_id#953, ms_order_id#954...........
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗?
我有一个有 3 列的 Spark 数据帧,我想基于第三个主题合并两个主题,这是一个示例:
+---+---+---+
|AAA|bbb|ccc|
+---+---+---+
|AAA|BBB| E|
|AAA|BBB| R|
|AAA|BBB| E|
|AAA|BBB| R|
|AAA|BBB| R|
|AAA|BBB| E|
+-----------+
Run Code Online (Sandbox Code Playgroud)
我想当 CCC 列的值为 E 时使用 AAA 列的值,当 CCC 为 R 时使用 BBB,这里是输出:
+---+---+
|NEW|ccc|
+---+---+
|AAA| E|
|BBB| R|
|AAA| E|
|BBB| R|
|BBB| R|
|AAA| E|
+-------+
Run Code Online (Sandbox Code Playgroud) 我正在尝试学习 PySpark。我必须根据相应的列 和来左连接两个数据框,比如说A和。通常,我会这样做:Bcolname_acolname_b
# create a new dataframe AB:
AB = A.join(B, A.colname_a == B.colname_b, how = 'left')
Run Code Online (Sandbox Code Playgroud)
但是,我无法直接获得列的名称。它们已存储在特定的模块中,我必须这样调用它们:
module.COLNAME_A # contains string with colname of A
module.COLNAME_B # contains string with colname of B
Run Code Online (Sandbox Code Playgroud)
如何将这些字符串值放入上面的命令中,以便加入数据帧?
提到使用spark read函数时的一些奇怪的行为:
spark.read.json(".../date=2019-08-0[1-9]")//works
spark.read.json(".../date=2019-08-[10-20]")//throws "Path does not exist" but folders definetily exist.
spark.read.json(".../date=2019-08-{10,11,12,13}")//works
spark.read.json(".../date=2019-08-[01-10]")// throws java.io.IOException: Illegal file pattern: Illegal character range near index n
Run Code Online (Sandbox Code Playgroud)
如何使用前导零通配范围?
专家们,我有一个简单的要求,但无法找到实现目标的功能。
我正在使用 pyspark (spark 1.6 和 Python 2.7)并有一个简单的 pyspark 数据框列,其中包含某些值,例如-
1849adb0-gfhe6543-bduyre763ryi-hjdsgf87qwefdb-78a9f4811265_ABC
1849adb0-rdty4545y4-657u5h556-zsdcafdqwddqdas-78a9f4811265_1234
1849adb0-89o8iulk89o89-89876h5-432rebm787rrer-78a9f4811265_12345678
Run Code Online (Sandbox Code Playgroud)
这些值的共同点是有一个“下划线”,之后有某些字符(可以是任意数量的字符)。这些是我有兴趣在输出中获得的字符。我想使用子字符串或正则表达式函数,它将找到列值中“下划线”的位置,并选择“从下划线位置+1”直到列值的末尾。因此输出看起来像一个数据框,其值为-
ABC
1234
12345678
Run Code Online (Sandbox Code Playgroud)
我尝试使用子字符串,但可以找到任何内容来“索引”“下划线”
谢谢!
我只对查询性能原因及其背后的架构差异感兴趣。我之前看到的所有答案都已过时,或者没有为我提供足够的背景信息来说明为什么 Impala 更适合即席查询。
从下面的 3 个考虑因素中,只有第二点解释了为什么 Impala 在更大的数据集上更快。您能否对以下陈述作出贡献?
Impala 不会错过查询预初始化的时间,这意味着 impalad 守护进程始终运行并准备就绪。另一方面, Spark Job Server出于相同目的提供持久上下文。
Impala 位于内存中,当数据没有足够的 RAM 时,可能会将数据溢出到磁盘上,从而导致性能下降。Spark 也是如此。主要区别在于 Spark 是在 Scala 上编写的并且有 JVM 限制,因此不建议使用大于 32 GB 的工作线程(因为 GC)。反过来,[错误,请参阅 UPD] Impala 是在 C++ 上实现的,并且对硬件要求很高:建议使用 128-256+ GB 的 RAM。这非常重要,但 Impala 仅适用于需要 32-64 GB 以上 RAM 的数据集。
Impala 与 Hadoop 基础设施集成。据我所知,使用 Impala 而不是其他内存 DWH 的主要原因是能够运行 Hadoop 数据格式,而无需从 Hadoop 导出数据。意味着 Impala 通常使用与 Spark 相同的存储/数据/分区/存储桶,并且与 Spark 相比,并没有从数据结构中获得任何额外的好处。我对吗?
PS 2019 年 Impala 比 Spark 更快吗?您见过任何性能基准吗?
问题更新: …
我在之前的查询中创建了一个名为 v1 的字段。然后我尝试从中创建一个新的派生字段。
一种方法有效,另一种则无效。我不明白,我希望它们是等价的。
这有效:
df = df.withColumn("outcome",expr("case when v1 = 0 then 1 when v1 > 0 then 2 else 0 end"))
Run Code Online (Sandbox Code Playgroud)
这失败了:
df = df.withColumn("outcome", F.when(F.col("v1") == 0, 1)
.F.when(F.col("v1") >0, 2)
.otherwise(0))
Run Code Online (Sandbox Code Playgroud)
有错误:
Py4JJavaError: An error occurred while calling o520.when.
: java.lang.IllegalArgumentException: when() can only be applied on a Column previously generated by when() function
Run Code Online (Sandbox Code Playgroud) apache-spark-sql ×10
apache-spark ×7
pyspark ×5
python ×3
hadoop ×1
impala ×1
join ×1
left-join ×1
python-3.x ×1