小编dat*_*ict的帖子

Spark 内存开销

Spark内存开销相关问题在SO中多次被问到,我经历了其中的大部分。然而,在浏览了多个博客后,我感到困惑。

以下是我的疑问

  • 内存开销是执行器内存的一部分还是独立的?由于很少有博客说内存开销是执行器内存的一部分,而其他博客则说执行器内存+内存开销(这是否意味着内存开销不是执行器内存的一部分)?
  • 内存开销和堆外开销是一样的吗?
  • 如果我没有在 Spark 提交中提及开销,会发生什么情况,它会采用默认值 18.75 还是不会?
  • 如果我们提供比默认值更多的内存开销,会产生副作用吗?

https://docs.qubole.com/en/latest/user-guide/engines/spark/defaults-executors.html https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html

下面是我想了解的案例。我有5个节点,每个节点16个vcore和128GB内存(其中120个可用),现在我想提交spark应用程序,下面是conf,我在想

Total Cores 16 * 5 = 80
Total Memory 120 * 5 = 600GB
Run Code Online (Sandbox Code Playgroud)

情况1:执行器内存的内存开销部分

spark.executor.memory=32G
spark.executor.cores=5
spark.executor.instances=14 (1 for AM)
spark.executor.memoryOverhead=8G ( giving more than 18.75% which is default)
spark.driver.memoryOverhead=8G
spark.driver.cores=5
Run Code Online (Sandbox Code Playgroud)

情况 2:内存开销不是执行程序内存的一部分

spark.executor.memory=28G
spark.executor.cores=5
spark.executor.instances=14 (1 for AM)
spark.executor.memoryOverhead=6G ( giving more than 18.75% which is default)
spark.driver.memoryOverhead=6G
spark.driver.cores=5
Run Code Online (Sandbox Code Playgroud)

根据下面的视频,我尝试使用 85% 的节点,即 120GB 中的 100GB 左右,不确定我们是否可以使用更多。

https://www.youtube.com/watch?v=ph_2xwVjCGs&list=PLdqfPU6gm4b9bJEb7crUwdkpprPLseCOB&index=8&t=1281s (4:12)

executor hadoop-yarn apache-spark pyspark memory-overhead

11
推荐指数
1
解决办法
3万
查看次数

spark提交中的PySpark依赖模块

我正在尝试运行 spark submit(pyspark) 命令。作为 spark 提交的一部分,我需要提供 boto3 的依赖项,因为它是我代码中的依赖项。我正在运行以下命令并且没有收到模块错误。

bin/spark-submit --master=local --py-files /home/user/boto3-develop.zip /home/user/py_script.py

Traceback (most recent call last):
  File "/home/user/py_script.py", line 16, in <module>
    import boto3
ModuleNotFoundError: No module named 'boto3'
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/apport_python_hook.py", line 63, in apport_excepthook
    from apport.fileutils import likely_packaged, get_recent_crashes
  File "/usr/lib/python3/dist-packages/apport/__init__.py", line 5, in <module>
    from apport.report import Report
  File "/usr/lib/python3/dist-packages/apport/report.py", line 30, in <module>
    import apport.fileutils
  File "/usr/lib/python3/dist-packages/apport/fileutils.py", line 23, in <module>
    from apport.packaging_impl import impl as packaging …
Run Code Online (Sandbox Code Playgroud)

apache-spark boto3 pyspark spark-submit

5
推荐指数
0
解决办法
374
查看次数

从 CSV 文件的字符串列中删除新行

我有一个包含多个字段的 CSV 文件。数据跨越多行的字段(字符串)很少。我想将这些多行聚合为一行。

输入数据:

1, "asdsdsdsds", "John"
2, "dfdhifdkinf
dfjdfgkdnjgknkdjgndkng
dkfdkjfnjdnf", "Roy"
3, "dfjfdkgjfgn", "Rahul"
Run Code Online (Sandbox Code Playgroud)

预期输出:

1, "asdsdsdsds", "John"
2, "dfdhifdkinf dfjdfgkdnjgknkdjgndkng dkfdkjfnjdnf", "Roy"
3, "dfjfdkgjfgn", "Rahul"
Run Code Online (Sandbox Code Playgroud)

早些时候在SO 中提出了同样的问题。然而,解决方案是使用 power shell 实现的。是否可以使用 python、pandas 或 pyspark 实现相同的目标。

每当数据跨越多行时,它肯定会用双引号引起来。

我试过的

我可以使用 Pandas 和 pyspark 读取数据而不会出现任何问题,即使有些字段跨越了多行。

熊猫:

pandas_df = pd.read_csv("file.csv")
Run Code Online (Sandbox Code Playgroud)

火花

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true') \
        .option("delimiter", ",").option("escape", '\\').option("escape", ':').\
    option("parserLib", "univocity").option("multiLine", "true").load("file.csv")
Run Code Online (Sandbox Code Playgroud)

编辑:

csv 文件中可以有 n 个字段,并且此数据跨度可以在任何字段中。

python csv newline pandas pyspark

5
推荐指数
1
解决办法
2874
查看次数

从 CSV 文件中删除新行

我想删除 CSV 文件字段数据中的换行符。SO/其他地方的多人问了同样的问题。然而,提供的解决方案是在脚本中。我正在寻找像 PYTHON 这样的编程语言或 Spark(不仅仅是这两个)的解决方案,因为我有很大的文件。

以前问过关于同一主题的问题:

我有一个大小约 1GB 的 CSV 文件,想删除字段数据中的换行符。CSV 文件的架构动态变化,因此我无法对架构进行硬编码。换行符并不总是出现在逗号之前,它甚至在一个字段中也是随机出现的。

样本数据:

playerID,yearID,gameNum,gameName,teamName,lgID,GP,startingPos
gomezle01,1933,1,Cricket,Team1,NYA,AL,1
ferreri01,1933,2,Hockey,"This is 
Team2",BOS,AL,1
gehrilo01,1933,3,"Game name is 
Cricket" 
,Team3,NYA,AL,1
gehrich01,1933,4,Hockey,"Here it is 
Team4",DET,AL,1
dykesji01,1933,5,"Game name is 
Hockey"
,"Team name 
Team5",CHA,AL,1
Run Code Online (Sandbox Code Playgroud)

预期输出:

playerID,yearID,gameNum,gameName,teamName,lgID,GP,startingPos
gomezle01,1933,1,Cricket,Team1,NYA,AL,1
ferreri01,1933,2,Hockey,"This is Team2",BOS,AL,1
gehrilo01,1933,3,"Game name is Cricket" ,Team3,NYA,AL,1
gehrich01,1933,4,Hockey,"Here it is Team4",DET,AL,1
dykesji01,1933,5,"Game name is Hockey","Team name Team5",CHA,AL,1
Run Code Online (Sandbox Code Playgroud)

换行符可以在任何字段的数据中。

编辑: 根据代码截图:

在此处输入图片说明

python csv newline apache-spark

5
推荐指数
1
解决办法
2万
查看次数

Spark csv 读取 ^A(\001)

我正在尝试在 pyspark 中使用 ^A(\001) 分隔符读取 csv 文件。我已经浏览了下面的链接,正如链接中提到的,我尝试了相同的方法,它按预期工作,即我能够读取 csv 文件并进一步处理它们。

链接: 如何使用spark-csv解析使用^A(即\001)作为分隔符的csv?

在职的

spark.read.option("wholeFile", "true"). \
                    option("inferSchema", "false"). \
                    option("header", "true"). \
                    option("quote", "\""). \
                    option("multiLine", "true"). \
                    option("delimiter", "\u0001"). \
                    csv("path/to/csv/file.csv")
Run Code Online (Sandbox Code Playgroud)

我想从数据库中读取它而不是硬编码分隔符,下面是我尝试过的方法。

update table set field_delimiter= 'field_delimiter=\\u0001'
Run Code Online (Sandbox Code Playgroud)

(键值对。使用键,我正在访问值)

delimiter = config.FIELD_DELIMITER (This will fetch the delimiter from the database)
>>print(delimiter) 
 \u0001
Run Code Online (Sandbox Code Playgroud)

不工作

spark.read.option("wholeFile", "true"). \
                    option("inferSchema", "false"). \
                    option("header", "true"). \
                    option("quote", "\""). \
                    option("multiLine", "true"). \
                    option("delimiter", delimiter). \
                    csv("path/to/csv/file.csv")
Run Code Online (Sandbox Code Playgroud)

错误:

: java.lang.IllegalArgumentException: Unsupported special character for delimiter: \u0001 …
Run Code Online (Sandbox Code Playgroud)

python csv delimiter apache-spark pyspark

5
推荐指数
1
解决办法
1318
查看次数

气流计划未更新

我创建了一个每周运行的 DAG。以下是我尝试过的,并且它按预期工作。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

SCHEDULE_INTERVAL = timedelta(weeks=1, seconds=00, minutes=00, hours=00)
default_args = {
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'wait_for_downstream': True,
    'provide_context': True,
    'start_date': datetime(2020, 12, 20, hour=00, minute=00, second=00)
}

with DAG("DAG", default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, catchup=True) as dag:
    t1 = BashOperator(
        task_id='dag_schedule',
        bash_command='echo DAG',
        dag=dag)
Run Code Online (Sandbox Code Playgroud)

按照时间表,它在27日(即剧本中的20日)播出。由于需求发生变化,现在我将开始日期更新为 30 日(即脚本中的 23 日)而不是 27 日(我的想法是从 30 日开始并从那里开始每周)。当我更改 DAG 的时间表时,即开始日期从 27 日更改为 30 日。DAG 没有按照最晚开始日期进行选择,不知道为什么?当我删除 DAG(因为它是测试 DAG,我删除了它,在产品中我无法删除它)并创建了具有相同名称且最新开始日期(即 30 日)的新 DAG 时,它按照计划运行。

airflow airflow-scheduler

3
推荐指数
1
解决办法
3004
查看次数

多个流文件的单个通知邮件 Nifi

我正在尝试从数据库中复制数据并使用 nifi 将其放置在 S3 中。我能够从数据库中复制数据并将其放置在 S3 中。现在我正在尝试为此流程添加错误处理。我刚刚添加了用于错误通知的 PutEmail 处理器。我只是给了一个错误的存储桶名称来验证电子邮件。这个 PutEmail 处理器被每个流文件触发(因为有 100 个流文件,邮件触发了 100 次)。我只想在流程中出现错误时仅触发一次 PutEmail(notification)。请对此提出任何建议。

下面是流程:

在此处输入图片说明

任何关于更好(通用)错误处理的建议都会对我有所帮助。

error-handling apache-nifi

1
推荐指数
1
解决办法
1387
查看次数