Chi*_*iMo 1 distinct cassandra apache-spark
我需要一个查询列出spark内部唯一的复合分区键.
CASSANDRA中的查询SELECT DISTINCT key1, key2, key3 FROM schema.table;非常快,但是在RDD中使用相同类型的数据过滤器或者spark.sql在比较中检索结果的速度非常慢.
例如
---- SPARK ----
var t1 = sc.cassandraTable("schema","table").select("key1", "key2", "key3").distinct()
var t2 = spark.sql("SELECT DISTINCT key1, key2, key3 FROM schema.table")
t1.count // takes 20 minutes
t2.count // takes 20 minutes
---- CASSANDRA ----
// takes < 1 minute while also printing out all results
SELECT DISTINCT key1, key2, key3 FROM schema.table;
Run Code Online (Sandbox Code Playgroud)
表格格式如下:
CREATE TABLE schema.table (
key1 text,
key2 text,
key3 text,
ckey1 text,
ckey2 text,
v1 int,
PRIMARY KEY ((key1, key2, key3), ckey1, ckey2)
);
Run Code Online (Sandbox Code Playgroud)
不会在其查询中引发使用cassandra优化吗?
如何有效地检索这些信息?
不会在其查询中引发使用cassandra优化吗?
是.但是SparkSQL只有列修剪和谓词下推.在RDD中它是手动的.
如何有效地检索这些信息?
由于您的请求返回得足够快,我将直接使用Java驱动程序来获取此结果集.
虽然Spark SQL可以提供一些基于C*的优化,但这些优化通常仅限于使用DataFrame接口时的谓词下推.这是因为框架仅向数据源提供有限的信息.我们可以通过对您编写的查询进行解释来看到这一点.
scala> spark.sql("SELECT DISTINCT key1, key2, key3 FROM test.tab").explain
== Physical Plan ==
*HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- Exchange hashpartitioning(key1#30, key2#31, key3#32, 200)
+- *HashAggregate(keys=[key1#30, key2#31, key3#32], functions=[])
+- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation test.tab[key1#30,key2#31,key3#32] ReadSchema: struct<key1:string,key2:string,key3:string>
Run Code Online (Sandbox Code Playgroud)
因此,您的Spark示例实际上将分为几个步骤.
为什么没有任何一个被推到C*?这是因为没有给出Datasource(本例中的CassandraSourceRelation)有关查询的Distinct部分的信息.这只是Spark目前工作方式的一部分.什么是可推的文件
使用RDDS,我们为Spark提供了一套直接的指令.这意味着如果你想推下一些东西,必须手动指定.让我们看一下RDD请求的调试输出
scala> sc.cassandraTable("test","tab").distinct.toDebugString
res2: String =
(13) MapPartitionsRDD[7] at distinct at <console>:45 []
| ShuffledRDD[6] at distinct at <console>:45 []
+-(13) MapPartitionsRDD[5] at distinct at <console>:45 []
| CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:19 []
Run Code Online (Sandbox Code Playgroud)
这里的问题是你的"不同"调用是RDD上的泛型操作,而不是特定于Cassandra.由于RDD要求所有优化都是显式的(你输入的内容是什么)Cassandra从不听说"Distinct"的这种需求,我们得到的计划几乎与我们的Spark SQL版本相同.进行全面扫描,将所有数据从Cassandra序列化为Spark.做一个Shuffle然后返回结果.
使用SparkSQL,如果不向Catalyst(SparkSQL/Dataframes Optimizer)添加新规则,就可以让它知道Cassandra可以在服务器级别处理一些不同的调用.然后需要为CassandraRDD子类实现它.
对于RDDS我们需要增加一个功能,像已经存在的where,select以及limit,调用卡桑德拉RDD.Distinct可以在此处添加新呼叫,但只有在特定情况下才允许.这个功能目前在SCC中不存在,但可以相对容易地添加,因为它所做的一切都是DISTINCT在请求之前添加,并且可能添加一些检查以确保它是DISTINCT有意义的.
由于我们知道我们想要的确切CQL请求,因此我们可以始终直接使用Cassandra驱动程序来获取此信息.Spark Cassandra连接器提供了我们可以使用的驱动程序池,或者我们可以本身使用Java驱动程序.要使用游泳池,我们会做类似的事情
import com.datastax.spark.connector.cql.CassandraConnector
CassandraConnector(sc.getConf).withSessionDo{ session =>
session.execute("SELECT DISTINCT key1, key2, key3 FROM test.tab;").all()
}
Run Code Online (Sandbox Code Playgroud)
然后,如果需要进一步的Spark工作,则将结果并行化.如果我们真的想要分发它,那么有必要将函数添加到Spark Cassandra Connector中,如上所述.
| 归档时间: |
|
| 查看次数: |
595 次 |
| 最近记录: |