Spark内存开销相关问题在SO中多次被问到,我经历了其中的大部分。然而,在浏览了多个博客后,我感到困惑。
以下是我的疑问
https://docs.qubole.com/en/latest/user-guide/engines/spark/defaults-executors.html https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
下面是我想了解的案例。我有5个节点,每个节点16个vcore和128GB内存(其中120个可用),现在我想提交spark应用程序,下面是conf,我在想
Total Cores 16 * 5 = 80
Total Memory 120 * 5 = 600GB
Run Code Online (Sandbox Code Playgroud)
情况1:执行器内存的内存开销部分
spark.executor.memory=32G
spark.executor.cores=5
spark.executor.instances=14 (1 for AM)
spark.executor.memoryOverhead=8G ( giving more than 18.75% which is default)
spark.driver.memoryOverhead=8G
spark.driver.cores=5
Run Code Online (Sandbox Code Playgroud)
情况 2:内存开销不是执行程序内存的一部分
spark.executor.memory=28G
spark.executor.cores=5
spark.executor.instances=14 (1 for AM)
spark.executor.memoryOverhead=6G ( giving more than 18.75% which is default)
spark.driver.memoryOverhead=6G
spark.driver.cores=5
Run Code Online (Sandbox Code Playgroud)
根据下面的视频,我尝试使用 85% 的节点,即 120GB 中的 100GB 左右,不确定我们是否可以使用更多。
https://www.youtube.com/watch?v=ph_2xwVjCGs&list=PLdqfPU6gm4b9bJEb7crUwdkpprPLseCOB&index=8&t=1281s (4:12)
我正在尝试运行 spark submit(pyspark) 命令。作为 spark 提交的一部分,我需要提供 boto3 的依赖项,因为它是我代码中的依赖项。我正在运行以下命令并且没有收到模块错误。
bin/spark-submit --master=local --py-files /home/user/boto3-develop.zip /home/user/py_script.py
Traceback (most recent call last):
File "/home/user/py_script.py", line 16, in <module>
import boto3
ModuleNotFoundError: No module named 'boto3'
Error in sys.excepthook:
Traceback (most recent call last):
File "/usr/lib/python3/dist-packages/apport_python_hook.py", line 63, in apport_excepthook
from apport.fileutils import likely_packaged, get_recent_crashes
File "/usr/lib/python3/dist-packages/apport/__init__.py", line 5, in <module>
from apport.report import Report
File "/usr/lib/python3/dist-packages/apport/report.py", line 30, in <module>
import apport.fileutils
File "/usr/lib/python3/dist-packages/apport/fileutils.py", line 23, in <module>
from apport.packaging_impl import impl as packaging …
Run Code Online (Sandbox Code Playgroud) 我有一个包含多个字段的 CSV 文件。数据跨越多行的字段(字符串)很少。我想将这些多行聚合为一行。
输入数据:
1, "asdsdsdsds", "John"
2, "dfdhifdkinf
dfjdfgkdnjgknkdjgndkng
dkfdkjfnjdnf", "Roy"
3, "dfjfdkgjfgn", "Rahul"
Run Code Online (Sandbox Code Playgroud)
预期输出:
1, "asdsdsdsds", "John"
2, "dfdhifdkinf dfjdfgkdnjgknkdjgndkng dkfdkjfnjdnf", "Roy"
3, "dfjfdkgjfgn", "Rahul"
Run Code Online (Sandbox Code Playgroud)
早些时候在SO 中提出了同样的问题。然而,解决方案是使用 power shell 实现的。是否可以使用 python、pandas 或 pyspark 实现相同的目标。
每当数据跨越多行时,它肯定会用双引号引起来。
我试过的
我可以使用 Pandas 和 pyspark 读取数据而不会出现任何问题,即使有些字段跨越了多行。
熊猫:
pandas_df = pd.read_csv("file.csv")
Run Code Online (Sandbox Code Playgroud)
火花
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true') \
.option("delimiter", ",").option("escape", '\\').option("escape", ':').\
option("parserLib", "univocity").option("multiLine", "true").load("file.csv")
Run Code Online (Sandbox Code Playgroud)
编辑:
csv 文件中可以有 n 个字段,并且此数据跨度可以在任何字段中。
我想删除 CSV 文件字段数据中的换行符。SO/其他地方的多人问了同样的问题。然而,提供的解决方案是在脚本中。我正在寻找像 PYTHON 这样的编程语言或 Spark(不仅仅是这两个)的解决方案,因为我有很大的文件。
以前问过关于同一主题的问题:
我有一个大小约 1GB 的 CSV 文件,想删除字段数据中的换行符。CSV 文件的架构动态变化,因此我无法对架构进行硬编码。换行符并不总是出现在逗号之前,它甚至在一个字段中也是随机出现的。
样本数据:
playerID,yearID,gameNum,gameName,teamName,lgID,GP,startingPos
gomezle01,1933,1,Cricket,Team1,NYA,AL,1
ferreri01,1933,2,Hockey,"This is
Team2",BOS,AL,1
gehrilo01,1933,3,"Game name is
Cricket"
,Team3,NYA,AL,1
gehrich01,1933,4,Hockey,"Here it is
Team4",DET,AL,1
dykesji01,1933,5,"Game name is
Hockey"
,"Team name
Team5",CHA,AL,1
Run Code Online (Sandbox Code Playgroud)
预期输出:
playerID,yearID,gameNum,gameName,teamName,lgID,GP,startingPos
gomezle01,1933,1,Cricket,Team1,NYA,AL,1
ferreri01,1933,2,Hockey,"This is Team2",BOS,AL,1
gehrilo01,1933,3,"Game name is Cricket" ,Team3,NYA,AL,1
gehrich01,1933,4,Hockey,"Here it is Team4",DET,AL,1
dykesji01,1933,5,"Game name is Hockey","Team name Team5",CHA,AL,1
Run Code Online (Sandbox Code Playgroud)
换行符可以在任何字段的数据中。
编辑: 根据代码截图:
我正在尝试在 pyspark 中使用 ^A(\001) 分隔符读取 csv 文件。我已经浏览了下面的链接,正如链接中提到的,我尝试了相同的方法,它按预期工作,即我能够读取 csv 文件并进一步处理它们。
链接: 如何使用spark-csv解析使用^A(即\001)作为分隔符的csv?
在职的
spark.read.option("wholeFile", "true"). \
option("inferSchema", "false"). \
option("header", "true"). \
option("quote", "\""). \
option("multiLine", "true"). \
option("delimiter", "\u0001"). \
csv("path/to/csv/file.csv")
Run Code Online (Sandbox Code Playgroud)
我想从数据库中读取它而不是硬编码分隔符,下面是我尝试过的方法。
update table set field_delimiter= 'field_delimiter=\\u0001'
Run Code Online (Sandbox Code Playgroud)
(键值对。使用键,我正在访问值)
delimiter = config.FIELD_DELIMITER (This will fetch the delimiter from the database)
>>print(delimiter)
\u0001
Run Code Online (Sandbox Code Playgroud)
不工作
spark.read.option("wholeFile", "true"). \
option("inferSchema", "false"). \
option("header", "true"). \
option("quote", "\""). \
option("multiLine", "true"). \
option("delimiter", delimiter). \
csv("path/to/csv/file.csv")
Run Code Online (Sandbox Code Playgroud)
错误:
: java.lang.IllegalArgumentException: Unsupported special character for delimiter: \u0001 …
Run Code Online (Sandbox Code Playgroud) 我创建了一个每周运行的 DAG。以下是我尝试过的,并且它按预期工作。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
SCHEDULE_INTERVAL = timedelta(weeks=1, seconds=00, minutes=00, hours=00)
default_args = {
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=2),
'wait_for_downstream': True,
'provide_context': True,
'start_date': datetime(2020, 12, 20, hour=00, minute=00, second=00)
}
with DAG("DAG", default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, catchup=True) as dag:
t1 = BashOperator(
task_id='dag_schedule',
bash_command='echo DAG',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
按照时间表,它在27日(即剧本中的20日)播出。由于需求发生变化,现在我将开始日期更新为 30 日(即脚本中的 23 日)而不是 27 日(我的想法是从 30 日开始并从那里开始每周)。当我更改 DAG 的时间表时,即开始日期从 27 日更改为 30 日。DAG 没有按照最晚开始日期进行选择,不知道为什么?当我删除 DAG(因为它是测试 DAG,我删除了它,在产品中我无法删除它)并创建了具有相同名称且最新开始日期(即 30 日)的新 DAG 时,它按照计划运行。
我正在尝试从数据库中复制数据并使用 nifi 将其放置在 S3 中。我能够从数据库中复制数据并将其放置在 S3 中。现在我正在尝试为此流程添加错误处理。我刚刚添加了用于错误通知的 PutEmail 处理器。我只是给了一个错误的存储桶名称来验证电子邮件。这个 PutEmail 处理器被每个流文件触发(因为有 100 个流文件,邮件触发了 100 次)。我只想在流程中出现错误时仅触发一次 PutEmail(notification)。请对此提出任何建议。
下面是流程:
任何关于更好(通用)错误处理的建议都会对我有所帮助。
apache-spark ×4
pyspark ×4
csv ×3
python ×3
newline ×2
airflow ×1
apache-nifi ×1
boto3 ×1
delimiter ×1
executor ×1
hadoop-yarn ×1
pandas ×1
spark-submit ×1