Amo*_*mol 3 apache-spark gcloud pyspark
我想使用 pyspark 在 for 循环中进行并行处理。
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
data = [a,b,c]
for i in data:
try:
df = spark.read.parquet('gs://'+i+'-data')
df.createOrReplaceTempView("people")
df2=spark.sql("""select * from people """)
df.show()
except Exception as e:
print(e)
continue
Run Code Online (Sandbox Code Playgroud)
上面提到的脚本工作正常,但我想在 pyspark 中进行并行处理,这在 scala 中是可能的
Spark 本身并行运行作业,但如果您仍然希望在代码中并行执行,您可以使用简单的 python 代码进行并行处理(这已在 DataBricks Only链接上进行了测试)。
data = ["a","b","c"]
from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)
def fun(x):
try:
df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
df.show()
except Exception as e:
print(e)
pool.map( fun,data)
Run Code Online (Sandbox Code Playgroud)
我对您的代码进行了一些更改,但这基本上就是您运行并行任务的方式,如果您有一些想要并行运行的平面文件,只需创建一个包含其名称的列表并将其传递到pool.map( fun,data)。
改变功能根据需要fun 。
有关多处理模块的更多详细信息,请查看文档。
同样,如果您想在 Scala 中执行此操作,您将需要以下模块
import scala.concurrent.{Future, Await}
Run Code Online (Sandbox Code Playgroud)
如需更详细的了解,请查看此内容。该代码适用于 Databricks,但经过一些更改,它将适用于您的环境。
| 归档时间: |
|
| 查看次数: |
21522 次 |
| 最近记录: |