在Spark中选择DISTINCT Cassandra

Chi*_*iMo 1 distinct cassandra apache-spark

我需要一个查询列出spark内部唯一的复合分区键.
CASSANDRA中的查询SELECT DISTINCT key1, key2, key3 FROM schema.table;非常快,但是在RDD中使用相同类型的数据过滤器或者spark.sql在比较中检索结果的速度非常慢.

例如

---- SPARK ----
var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")

t1.count // takes 20 minutes
t2.count // takes 20 minutes

---- CASSANDRA ----
// takes < 1 minute while also printing out all results
SELECT DISTINCT key1, key2, key3 FROM schema.table; 
Run Code Online (Sandbox Code Playgroud)

表格格式如下:

CREATE TABLE schema.table (
    key1 text,
    key2 text,
    key3 text,
    ckey1 text,
    ckey2 text,
    v1 int,
    PRIMARY KEY ((key1, key2, key3), ckey1, ckey2)
);
Run Code Online (Sandbox Code Playgroud)

不会在其查询中引发使用cassandra优化吗?
如何有效地检索这些信息?

Rus*_*ssS 6

快速回答

不会在其查询中引发使用cassandra优化吗?

是.但是SparkSQL只有列修剪和谓词下推.在RDD中它是手动的.

如何有效地检索这些信息?

由于您的请求返回得足够快,我将直接使用Java驱动程序来获取此结果集.


长答案

虽然Spark SQL可以提供一些基于C*的优化,但这些优化通常仅限于使用DataFrame接口时的谓词下推.这是因为框架仅向数据源提供有限的信息.我们可以通过对您编写的查询进行解释来看到这一点.

让我们从SparkSQL示例开始

scala> spark.sql("SELECT DISTINCT key1, key2, key3 FROM test.tab").explain
== Physical Plan ==
*HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- Exchange hashpartitioning(key1#30, key2#31, key3#32, 200)
   +- *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
      +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation test.tab[key1#30,key2#31,key3#32] ReadSchema: struct<key1:string,key2:string,key3:string>
Run Code Online (Sandbox Code Playgroud)

因此,您的Spark示例实际上将分为几个步骤.

  1. 扫描:读取此表中的所有数据.这意味着将每个值从C机器序列化到Spark Executor JVM,换句话说就是大量的工作.
  2. *HashAggregate/Exchange/Hash Aggregate:从每个执行器获取值,在本地散列它们然后在机器之间交换数据并再次散列以确保唯一性.在外行人的术语中,这意味着创建大型哈希结构,序列化它们,运行复杂的分布式sortmerge,然后再次运行哈希.(昂贵)

为什么没有任何一个被推到C*?这是因为没有给出Datasource(本例中的CassandraSourceRelation)有关查询的Distinct部分的信息.这只是Spark目前工作方式的一部分.什么是可推的文件

那么RDD版本呢?

使用RDDS,我们为Spark提供了一套直接的指令.这意味着如果你想推下一些东西,必须手动指定.让我们看一下RDD请求的调试输出

scala> sc.cassandraTable("test","tab").distinct.toDebugString
res2: String =
(13) MapPartitionsRDD[7] at distinct at <console>:45 []
 |   ShuffledRDD[6] at distinct at <console>:45 []
 +-(13) MapPartitionsRDD[5] at distinct at <console>:45 []
    |   CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:19 []
Run Code Online (Sandbox Code Playgroud)

这里的问题是你的"不同"调用是RDD上的泛型操作,而不是特定于Cassandra.由于RDD要求所有优化都是显式的(你输入的内容是什么)Cassandra从不听说"Distinct"的这种需求,我们得到的计划几乎与我们的Spark SQL版本相同.进行全面扫描,将所有数据从Cassandra序列化为Spark.做一个Shuffle然后返回结果.

那么我们能做些什么呢?

使用SparkSQL,如果不向Catalyst(SparkSQL/Dataframes Optimizer)添加新规则,就可以让它知道Cassandra可以在服务器级别处理一些不同的调用.然后需要为CassandraRDD子类实现它.

对于RDDS我们需要增加一个功能,像已经存在的where,select以及limit,调用卡桑德拉RDD.Distinct可以在此处添加新呼叫,但只有在特定情况下才允许.这个功能目前在SCC中不存在,但可以相对容易地添加,因为它所做的一切都是DISTINCT请求之前添加,并且可能添加一些检查以确保它是DISTINCT有意义的.

今天我们可以做什么而不修改底层连接器?

由于我们知道我们想要的确切CQL请求,因此我们可以始终直接使用Cassandra驱动程序来获取此信息.Spark Cassandra连接器提供了我们可以使用的驱动程序池,或者我们可以本身使用Java驱动程序.要使用游泳池,我们会做类似的事情

import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(sc.getConf).withSessionDo{ session => 
  session.execute("SELECT DISTINCT key1, key2, key3 FROM test.tab;").all()
}
Run Code Online (Sandbox Code Playgroud)

然后,如果需要进一步的Spark工作,则将结果并行化.如果我们真的想要分发它,那么有必要将函数添加到Spark Cassandra Connector中,如上所述.