PySpark:设置执行器/内核和内存本地机器

pyt*_*ist 9 python json apache-spark-sql pyspark jupyter

所以我看了一堆关于 Pyspark、Jupyter 和设置内存/内核/执行器(以及相关内存)的帖子。

但我似乎被卡住了——

问题 1:我没有看到我的机器使用内核或内存。为什么?我可以对执行器/内核/内存进行一些调整以优化读取文件的速度吗?问题 2:还有什么方法可以让我看到进度条显示导入了多少文件 ahs(spark-monitor 似乎没有这样做)。

我正在将一个 33.5gb 的文件导入 pyspark。

机器有 112 GB 或 RAM 8 核/16 虚拟核。

from pyspark.sql import SparkSession
Run Code Online (Sandbox Code Playgroud)
spark = SparkSession \
    .builder \
    .appName("Summaries") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), 

('spark.app.name', 'Spark Updated Conf'), 
('spark.driver.cores', '4'), ('spark.executor.cores', '16'), 
('spark.driver.memory','90g')])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")

Run Code Online (Sandbox Code Playgroud)

我认为 pyspark 即使在读取文件时也在发挥它的魔力(所以我应该看到大量的核心/内存利用率)。但我没有看到它。救命!

更新:使用较小的 zip 文件 (89 MB) 进行测试

Pyspark 需要 72 秒 Pandas 需要 10.6 秒 使用的代码:

start = time.time()
df = spark.read.json("../Data/small.json.gz")
end = time.time()
print(end - start)

start = time.time()
df = pa.read_json('../Data/small.json.gz',compression='gzip', lines = True)
end = time.time()
print(end - start)
Run Code Online (Sandbox Code Playgroud)

Cro*_*ull 6

尽管您的问题的答案仅在于以下问题之一,但让我重写您的示例以解释正在发生的事情。

设置您的配置

首先,您不需要启动和停止上下文来设置您的配置。从 spark 2.0 开始,您可以创建 spark 会话,然后设置配置选项。

from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("yourAwesomeApp").getOrCreate())
spark.conf.set("spark.executor.memory", "40g")
spark.conf.set("spark.executor.cores", "2")
Run Code Online (Sandbox Code Playgroud)

读取您的数据

Spark 会懒惰地评估 DAG。您在剪辑中测量的时间不是数据加载到数据帧中的时间,而只是 JSON 文件的模式推断。模式推断很昂贵,您应该通过设置数据的模式来尽量避免它。您将看到以下性能的巨大差异:

df = spark.read.json("../data/a_very_large_json.json.gz")
Run Code Online (Sandbox Code Playgroud)

from pyspark.sql.types import (
    StructType, 
    StringType, 
    StructField,
)
json_schema = schema = StructType([
    StructField('data', StructType([
        StructField("field1", StringType(), nullable=False),
        StructField("field2", StringType(), nullable=False),
        StructField("field3", StringType(), nullable=True),
        StructField("field4", StringType(), nullable=True),
        StructField("field5", LongType(), nullable=False),
    ])),
])
df = spark.read.json("../data/a_very_large_json.json.gz", schema=json_schema)
Run Code Online (Sandbox Code Playgroud)

如果您提供架构,这条指令应该几乎立即生效。正如另一位用户已经提到的,要执行任务,您需要有一个活动,例如 show、head、collect、persist 等。

df.show()
Run Code Online (Sandbox Code Playgroud)

您可以在配置上设置执行程序实例和内核的数量,但这些实例的实际使用还取决于您的输入数据和您执行的转换/操作。根据您的描述,我假设您在独立模式下工作,因此默认情况下有一个 executor 实例(使用所有内核),您应该将 executor 内存设置为使用可用的内存。据我所知,当您在独立模式下工作时,会spark.executor.instances被忽略,执行器的实际数量取决于可用内核的数量和spark.executor.cores

与熊猫的比较

如果您只使用一个节点,将数据加载到数据框中,那么 Spark 和 Pandas 之间的比较是不公平的。Spark 总是会有更高的开销。当您的数据集无法容纳在一台机器的内存中并且您有多个节点来执行计算工作时,Sparks 将大放异彩。如果您对Pandas感到满意,我想您可能会对Databricks 的koalas感兴趣。

推荐

我更喜欢在应用程序之外设置执行细节(例如使用 spark-submit 参数)。在极少数情况下,为了提高性能,您需要将其中的一些设置到代码中,但是对于每个新版本的 Spark,这种情况不太频繁。如果您能做到这一点,您的应用程序将更加面向未来,并且易于扩展。

  • Spark.conf.set("spark.executor.cores", "2") 给出错误: AnalysisException:无法修改 Spark 配置的值:spark.executor.cores; (2认同)
  • @renjiege 你使用的是spark 3.x吗?如果是这种情况,您将需要使用 `spark.conf.set("spark.sql.legacy.setCommandRejectsSparkCoreConfs","false")` 才能更改配置。 (2认同)

Kar*_*rma 2

spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("../Data/inasnelylargefile.json.gz")
Run Code Online (Sandbox Code Playgroud)

添加这个:

df.show() 
##OR
df.persist()
Run Code Online (Sandbox Code Playgroud)

您所做的比较不是同类比较,spark 执行惰性评估,这意味着如果您不对操作调用操作,它只会编译并为您准备好 DAG,而不会执行任何操作。

Spark中,有两个概念,

  1. 转换:惰性评估
  2. 操作:(如collect()、take()、show()、persist())立即评估。

在您的情况下, read() 只是一个转换,添加一个操作应该触发计算。

有关操作与转换的更多信息: https: //training.databricks.com/visualapi.pdf