Pau*_*vis 5 python multiprocessing apache-spark pyspark
我正在尝试使用多重处理并行读取 100 个 csv 文件(然后分别并行处理它们)。以下是我在 AWS 中的 EMR 主节点上托管的 Jupyter 中运行的代码。(最终将是 100k csv 文件,因此需要分布式读取)。
import findspark
import boto3
from multiprocessing.pool import ThreadPool
import logging
import sys
findspark.init()
from pyspark import SparkContext, SparkConf, sql
conf = SparkConf().setMaster("local[*]")
conf.set('spark.scheduler.mode', 'FAIR')
sc = SparkContext.getOrCreate(conf)
spark = sql.SparkSession.builder.master("local[*]").appName("ETL").getOrCreate()
s3 = boto3.resource(...)
bucket = ''
bucketObj = s3.Bucket(bucket)
numNodes = 64
def processTest(key):
logger.info(key + ' ---- Start\n')
fLog = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv(buildS3Path(bucket) + key)
logger.info(key + ' ---- Finish Read\n')
fLog = renameColumns(NAME_MAP, fLog)
logger.info(key + ' ---- Finish Rename\n')
(landLog, flags) = validate(fLog)
logger.info(key + ' ---- Finish Validation\n')
files = list(bucketObj.objects.filter(Prefix=subfolder))
keys = list(map(lambda obj: obj.key, files))
keys = keys
# files = s3C.list_objects(Bucket=bucket, Prefix=subfolder)['Contents']
p = ThreadPool(numNodes)
p.map(processTest, keys)
Run Code Online (Sandbox Code Playgroud)
蓝线是我的主节点上的 CPU 使用率。所有日志都显示我在一台机器上运行:
INFO:pyspark:172.31.29.33
Run Code Online (Sandbox Code Playgroud)
如何让 Spark 将池分配给工人?
仔细阅读 SparkSession.Builder API 文档,传递给的字符串SparkSession.builder.master('xxxx')
是通过以下方式连接到主节点的主机:spark://xxxx:7077。就像 user8371915 所说,我不需要使用独立的本地主机。相反,这个修复就像一个魅力:
SparkSession.builder.master('yarn')
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4887 次 |
最近记录: |