Bud*_*ide 9 python parallel-processing apache-spark apache-spark-sql
我是Spark的新手,并尝试用Python理解Spark概念.在使用Python为Spark开发应用程序时,我对以并行方式处理数据的方式感到困惑.
1.每个人都说我不需要担心在处理封装在RDD变量中的数据时会调用哪个节点和多少个节点.因此,基于我的最佳理解,我相信Spark集群会对下面的代码做什么:
a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
c = b.collect()
Run Code Online (Sandbox Code Playgroud)
可以描述为以下步骤:
(1)变量a将被保存为包含预期txt文件内容的RDD变量
(2)不同的RDD块a将被广播到集群中的不同节点,并且当不同节点
(3)中的每个块时将进行过滤方法.调用集合操作,结果将从不同节点返回到主节点并保存为局部变量c.
我的描述是对的吗?如果没有,该程序究竟是什么?如果我是对的,那么并行化方法有什么意义呢?以下代码是否与上面列出的代码相同?
a = sc.textFile(filename).collect()
b = sc.parallelize(a).filter(lambda x: len(x)>0 and x.split("\t").count("9999-12-31"))
c = b.collect()
Run Code Online (Sandbox Code Playgroud)
2.对于以下代码,是否可以通过将定义的表划分为多个分区来并行处理SQL查询语法?
a = sc.textFile(filename)
b = a.filter(lambda x: len(x) > 0 and x.split("\t").count("9999-12-31") == 0)
parts = b.map(lambda x: x.split("\t"))
records = parts.map(Row(r0 = str(x[0]), r1 = x[1], r2 = x[2]))
rTable = sqlContext.createDataFrame(records)
rTable.registerTempTable("rTable")
result = sqlContext.sql("select substr(r0,1,2), case when r1=1 then r1*100 else r1*10 end, r2 from rTable").collect()
Run Code Online (Sandbox Code Playgroud)
您的第一步描述是正确的。但第二步和第三步还有更多内容。
根据 Spark 文档:
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
textFile 方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中默认块为 64MB),但您也可以通过传递更大的值来请求更多数量的分区。
如果将文件放入 HDFS 并将其路径作为textFile参数传递,则 RDD 的分区a将基于 HDFS 块创建。因此,在这种情况下,腭化量取决于 HDFS 块的数量。此外,数据已经分区并通过 HDFS 移动到集群机器。
如果您使用本地文件系统上的路径(在所有节点上可用)并且不指定minPartitions默认并行度(取决于集群中的核心数量),则会选择默认并行度。在这种情况下,您必须在每个工作人员上复制文件或将其放入每个工作人员都可以使用的共享存储中。
在每种情况下,Spark 都会避免广播任何数据,而是尝试使用每台机器中的现有块。所以你的第二步并不完全正确。
根据 Spark 文档:
collect(): Array[T] 返回一个包含此 RDD 中所有元素的数组
在此步骤中,您的 RDDb将被洗牌/收集到您的驱动程序/节点中。