Spark:PySpark + Cassandra查询性能

TMi*_*hel 5 cassandra apache-spark pyspark

我已在本地计算机上设置了Spark 2.0和Cassandra 3.0(8核,16gb ram)用于测试目的,编辑spark-defaults.conf如下:

spark.python.worker.memory 1g
spark.executor.cores 4
spark.executor.instances 4
spark.sql.shuffle.partitions 4
Run Code Online (Sandbox Code Playgroud)

接下来我在Cassandra中导入了150万行:

test(
    tid int,
    cid int,
    pid int,
    ev list<double>,
    primary key (tid)
)
Run Code Online (Sandbox Code Playgroud)

test.ev 是包含数值的列表,即 [2240,2081,159,304,1189,1125,1779,693,2187,1738,546,496,382,1761,680]

现在在代码中,测试我刚刚创建的一个东西SparkSession,连接到Cassandra并进行简单的选择计数:

cassandra = spark.read.format("org.apache.spark.sql.cassandra")
df = cassandra.load(keyspace="testks",table="test")
df.select().count()
Run Code Online (Sandbox Code Playgroud)

此时,Spark输出count并需要大约28秒才能完成Job,分布在13 Tasks(在Spark UI,任务的总输入为331.6MB)

问题:

  • 这是预期的表现吗?如果没有,我错过了什么?
  • Theory说DataFrame的分区数决定了Spark将分配作业的任务数.如果我设置spark.sql.shuffle.partitions为4,为什么要创建13个任务?(还确保调用rdd.getNumPartitions()我的DataFrame 的分区数)

更新

我想测试这个数据的常见操作:

  • 查询大型数据集,例如,从100,000~N行分组 pid
  • 选择ev,alist<double>
  • 对每个成员执行平均值,假设现在每个列表具有相同的长度,即 df.groupBy('pid').agg(avg(df['ev'][1]))

正如@ zero323建议的那样,我为Cassandra部署了一台外部机器(2Gb RAM,4核,SSD),仅用于此测试,并加载了相同的数据集.df.select().count()与我之前的测试相比,结果是预期更大的延迟和整体性能更差(完成时间大约需要70秒Job).

编辑:我误解了他的建议.@ zero323旨在让卡桑德拉执行,而不是使用SQL星火计数,如解释在这里

另外我想要指出的是,我知道list<double>为这种类型的数据设置一个宽行的固有反模式,但我现在关注的是更多花在检索大数据集而不是实际数据上的时间.平均计算时间.

zer*_*323 5

这是预期的表现吗?如果没有,我错过了什么?

它看起来很慢,但并不完全出乎意料.一般count表示为

SELECT 1 FROM table
Run Code Online (Sandbox Code Playgroud)

接着是Spark方面的总结.因此,虽然它被优化,但它仍然相当低效,因为你从外部源获取N个长整数只是为了在本地求和.

正如文档所解释的那样,Cassandra支持RDD(非Datasets)提供cassandraCount执行服务器端计数的优化方法.

Theory说DataFrame的分区数决定了Spark将分配作业的任务数.如果我设置spark.sql.shuffle.partitions为(...),为什么要创建(...)任务?

因为spark.sql.shuffle.partitions这里没有使用.此属性用于确定shuffle的分区数(当数据由某些键集合聚合时),而不是用于Dataset创建或全局聚合count(*)(总是使用1个分区进行最终聚合).

如果您有兴趣控制初始分区的数量,您应该看看spark.cassandra.input.split.size_in_mb哪个定义:

要提取到Spark分区的大约数据量.生成的Spark分区的最小数量为1 + 2*SparkContext.defaultParallelism

正如你可以看到的另一个因素是,spark.default.parallelism但它并不是一个微妙的配置,因此一般来说取决于它不是一个最佳选择.