Kni*_*t71 5 scala cassandra datastax-enterprise apache-spark
我正在使用cassandra-connector将数据从cassandra分区读取到spark.我尝试了以下解决方案来读取partitions.I尝试通过尽可能多地创建rdds来并行化任务,但解决方案ONE和解决方案TWO都具有相同的性能.
在解决方案ONE中,我可以立即看到spark UI中的各个阶段.我试图在解决方案TWO中避免一个for循环.
在解决方案TWO中,阶段在相当长的时间之后出现.随着用户ID的数量增加,在阶段出现在解决方案TWO的火花UI中之前,时间显着增加.
Version
spark - 1.1
Dse - 4.6
cassandra-connector -1.1
Setup
3 - Nodes with spark cassandra
Each node has 1 core dedicated to this task.
512MB ram for the executor memory.
Run Code Online (Sandbox Code Playgroud)
我的cassandra表架构,
CREATE TABLE test (
user text,
userid bigint,
period timestamp,
ip text,
data blob,
PRIMARY KEY((user,userid,period),ip)
);
Run Code Online (Sandbox Code Playgroud)
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val partitions = users.flatMap(x => period.map(y => (x,y))))
val userids = 1 to 10
for (userid <- userids){
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,userid)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
}
Run Code Online (Sandbox Code Playgroud)
val users = List("u1","u2","u3")
val period = List("2000-05-01","2000-05-01")
val userids = 1 to 10
val partitions = users.flatMap(x => period.flatMap(
y => userids.map(z => (x,y,z))))
val rdds = partitions.map(x => sc.cassandraTable("test_keyspace","table1")
.select("data")
.where("user=?", x._1)
.where("period=?",x._2)
.where("userid=?,x._3)
)
val combinedRdd = sc.union(rdds)
val result = combinedRdd.map(getDataFromColumns)
.coalesce(4)
.reduceByKey((x,y) => x+y)
.collect()
result.foreach(prinltn)
Run Code Online (Sandbox Code Playgroud)
为什么解决方案TWO不比解决方案ONE快?
我的理解是,由于所有分区都在一段查询,而数据分布在节点上,因此应该更快.如果我错了,请纠正我.
首先,您应该查看joinWithCassandraTable,它应该是您正在做的更容易的api(授予您足够的分区以使其值得).这个api接受分区键的RDD和palatalizes并从C*分发它们的检索.
这会让你做类似的事情
sc.parallelize(partitions).joinWithCassandraTable("keyspace","table")
Run Code Online (Sandbox Code Playgroud)
如果你想要你也可以做一个,repartitionByCassandraReplica但这很可能对非常小的请求没有好处.您必须对数据进行基准测试才能确定.
如果您只想做原始驱动程序命令,您可以执行类似的操作
val cc = CassandraConnector(sc.getConf)
partitions.mapPartitions{ it =>
cc.withSessionDo{ session =>
session.execute( Some query )
}
}
Run Code Online (Sandbox Code Playgroud)
现在让我们快速浏览您的代码示例首先,我们只检索60个C*分区.对于这个用例,与从C*检索分区所花费的时间相比,我们很可能会在设置和删除任务所需的时间内占主导地位.
在这两种解决方案中,由于Spark的懒惰评估,你基本上都在做同样的事情.驱动程序创建一个图表,首先创建60个RDD,每个RDD都有一个惰性指令,用于从C*中检索单个分区.(每个分区1个RDD是坏的,RDD用于存储大量数据,因此最终会产生相当大的开销).虽然60 RDD是用不同的模式制作的,但这并不重要,因为在你打电话收集之前它们的实际评估不会发生.驱动程序继续设置新的RDD和转换.
在我们点击收集之前,绝对没有做任何事情来从C*中检索数据,因为我们在上面发布的两个解决方案中使用基本相同的依赖关系图点击收集,在两种情况下都会发生确切(或非常相似)的事情.所有60个RDD将在依赖图指定时解析.这将并行发生,但同样会非常开销.
为了避免这种情况,请查看我上面使用单个RDD获取所有信息的示例.
| 归档时间: |
|
| 查看次数: |
2956 次 |
| 最近记录: |