TMi*_*hel 5 cassandra apache-spark pyspark
我已在本地计算机上设置了Spark 2.0和Cassandra 3.0(8核,16gb ram)用于测试目的,编辑spark-defaults.conf如下:
spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
Run Code Online (Sandbox Code Playgroud)
接下来我在Cassandra中导入了150万行:
test(
tid int,
cid int,
pid int,
ev list<double>,
primary key (tid)
)
Run Code Online (Sandbox Code Playgroud)
test.ev 是包含数值的列表,即 [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]
现在在代码中,测试我刚刚创建的一个东西SparkSession,连接到Cassandra并进行简单的选择计数:
cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
Run Code Online (Sandbox Code Playgroud)
此时,Spark输出count并需要大约28秒才能完成Job,分布在13 Tasks(在Spark UI,任务的总输入为331.6MB)
问题:
spark.sql.shuffle.partitions为4,为什么要创建13个任务?(还确保调用rdd.getNumPartitions()我的DataFrame 的分区数)更新
我想测试这个数据的常见操作:
pidev,alist<double>df.groupBy('pid').agg(avg(df['ev'][1]))正如@ zero323建议的那样,我为Cassandra部署了一台外部机器(2Gb RAM,4核,SSD),仅用于此测试,并加载了相同的数据集.df.select().count()与我之前的测试相比,结果是预期更大的延迟和整体性能更差(完成时间大约需要70秒Job).
编辑:我误解了他的建议.@ zero323旨在让卡桑德拉执行,而不是使用SQL星火计数,如解释在这里
另外我想要指出的是,我知道list<double>为这种类型的数据设置一个宽行的固有反模式,但我现在关注的是更多花在检索大数据集而不是实际数据上的时间.平均计算时间.
这是预期的表现吗?如果没有,我错过了什么?
它看起来很慢,但并不完全出乎意料.一般count表示为
SELECT 1 FROM table
Run Code Online (Sandbox Code Playgroud)
接着是Spark方面的总结.因此,虽然它被优化,但它仍然相当低效,因为你从外部源获取N个长整数只是为了在本地求和.
正如文档所解释的那样,Cassandra支持RDD(非Datasets)提供cassandraCount执行服务器端计数的优化方法.
Theory说DataFrame的分区数决定了Spark将分配作业的任务数.如果我设置
spark.sql.shuffle.partitions为(...),为什么要创建(...)任务?
因为spark.sql.shuffle.partitions这里没有使用.此属性用于确定shuffle的分区数(当数据由某些键集合聚合时),而不是用于Dataset创建或全局聚合count(*)(总是使用1个分区进行最终聚合).
如果您有兴趣控制初始分区的数量,您应该看看spark.cassandra.input.split.size_in_mb哪个定义:
要提取到Spark分区的大约数据量.生成的Spark分区的最小数量为1 + 2*SparkContext.defaultParallelism
正如你可以看到的另一个因素是,spark.default.parallelism但它并不是一个微妙的配置,因此一般来说取决于它不是一个最佳选择.
| 归档时间: |
|
| 查看次数: |
584 次 |
| 最近记录: |