Mee*_*hew 17 python multithreading apache-spark pyspark
从Spark文档中可以了解到应用程序中的调度:
在给定的Spark应用程序(SparkContext实例)中,如果从单独的线程提交多个并行作业,则它们可以同时运行.通过"作业",在本节中,我们指的是Spark动作(例如,保存,收集)以及需要运行以评估该动作的任何任务.Spark的调度程序是完全线程安全的,并支持此用例,以支持提供多个请求的应用程序(例如,查询多个用户)."
我在Scala和Java中找到了相同的示例代码.有人可以举例说明如何使用PySpark实现这一点吗?
spa*_*oob 13
我遇到了同样的问题,所以我创建了一个很小的自包含示例.我使用python的线程模块创建多个线程,并同时提交多个spark作业.
请注意,默认情况下,spark将以先进先出(FIFO)运行作业:http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application.在下面的示例中,我将其更改为FAIR调度
# Prereqs:
# set
# spark.dynamicAllocation.enabled true
# spark.shuffle.service.enabled true
spark.scheduler.mode FAIR
# in spark-defaults.conf
import threading
from pyspark import SparkContext, SparkConf
def task(sc, i):
print sc.parallelize(range(i*10000)).count()
def run_multiple_jobs():
conf = SparkConf().setMaster('local[*]').setAppName('appname')
# Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
conf.set('spark.scheduler.mode', 'FAIR')
sc = SparkContext(conf=conf)
for i in range(4):
t = threading.Thread(target=task, args=(sc, i))
t.start()
print 'spark task', i, 'has started'
run_multiple_jobs()
Run Code Online (Sandbox Code Playgroud)
输出:
spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0
10000
20000
Run Code Online (Sandbox Code Playgroud)
今天,我也在问同样的问题。多处理模块提供了一个ThreadPool,它为您生成几个线程,因此并行运行作业。首先实例化函数,然后创建池,然后map在您想要迭代的范围内创建它。
就我而言,我正在为不同数量的中心(超参数调整)计算这些 WSSSE 数字,以获得“良好”的 k 均值聚类……就像MLSpark 文档中概述的那样。无需进一步解释,以下是我的 IPython 工作表中的一些单元格:
from pyspark.mllib.clustering import KMeans
import numpy as np
Run Code Online (Sandbox Code Playgroud)
c_points 是 12dim 数组:
>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([-2, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]),
array([ 7, -1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])]
Run Code Online (Sandbox Code Playgroud)
在下面,对于每个i我计算这个 WSSSE 值并将其作为元组返回:
def error(point, clusters):
center = clusters.centers[clusters.predict(point)]
return np.linalg.norm(point - center)
def calc_wssse(i):
clusters = KMeans.train(c_points, i, maxIterations=20,
runs=20, initializationMode="random")
WSSSE = c_points\
.map(lambda point: error(point, clusters))\
.reduce(lambda x, y: x + y)
return (i, WSSSE)
Run Code Online (Sandbox Code Playgroud)
下面开始有趣的部分:
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)
Run Code Online (Sandbox Code Playgroud)
运行:
wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points
Run Code Online (Sandbox Code Playgroud)
给出:
[(1, 195318509740785.66),
(2, 77539612257334.33),
(3, 78254073754531.1),
...
]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13804 次 |
| 最近记录: |