如何在PySpark中的不同线程中在一个Sparkcontext中运行多个作业?

Mee*_*hew 17 python multithreading apache-spark pyspark

从Spark文档中可以了解到应用程序中的调度:

在给定的Spark应用程序(SparkContext实例)中,如果从单独的线程提交多个并行作业,则它们可以同时运行.通过"作业",在本节中,我们指的是Spark动作(例如,保存,收集)以及需要运行以评估该动作的任何任务.Spark的调度程序是完全线程安全的,并支持此用例,以支持提供多个请求的应用程序(例如,查询多个用户)."

我在Scala和Java中找到了相同的示例代码.有人可以举例说明如何使用PySpark实现这一点吗?

spa*_*oob 13

我遇到了同样的问题,所以我创建了一个很小的自包含示例.我使用python的线程模块创建多个线程,并同时提交多个spark作业.

请注意,默认情况下,spark将以先进先出(FIFO)运行作业:http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application.在下面的示例中,我将其更改为FAIR调度

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()
Run Code Online (Sandbox Code Playgroud)

输出:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000
Run Code Online (Sandbox Code Playgroud)


Har*_*lly 7

今天,我也在问同样的问题。多处理模块提供了一个ThreadPool,它为您生成几个线程,因此并行运行作业。首先实例化函数,然后创建池,然后map在您想要迭代的范围内创建它。

就我而言,我正在为不同数量的中心(超参数调整)计算这些 WSSSE 数字,以获得“良好”的 k 均值聚类……就像MLSpark 文档中概述的那样。无需进一步解释,以下是我的 IPython 工作表中的一些单元格:

from pyspark.mllib.clustering import KMeans
import numpy as np
Run Code Online (Sandbox Code Playgroud)

c_points 是 12dim 数组:

>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([-2,  0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([ 7, -1,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0])]
Run Code Online (Sandbox Code Playgroud)

在下面,对于每个i我计算这个 WSSSE 值并将其作为元组返回:

def error(point, clusters):
    center = clusters.centers[clusters.predict(point)]
    return np.linalg.norm(point - center)

def calc_wssse(i):
    clusters = KMeans.train(c_points, i, maxIterations=20,
        runs=20, initializationMode="random")
    WSSSE = c_points\
        .map(lambda point: error(point, clusters))\
        .reduce(lambda x, y: x + y)
    return (i, WSSSE)
Run Code Online (Sandbox Code Playgroud)

下面开始有趣的部分:

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)
Run Code Online (Sandbox Code Playgroud)

运行:

wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points
Run Code Online (Sandbox Code Playgroud)

给出:

[(1, 195318509740785.66),
 (2, 77539612257334.33),
 (3, 78254073754531.1),
 ...
]
Run Code Online (Sandbox Code Playgroud)