Spark Cassandra Connector keyBy和shuffling

Sha*_*hai 4 grouping shuffle connector cassandra apache-spark

我试图通过尽可能避免改组来优化我的火花工作.

我正在使用cassandraTable来创建RDD.

列族的列名是动态的,因此定义如下:

CREATE TABLE "Profile" (
  key text,
  column1 text,
  value blob,
  PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
  bloom_filter_fp_chance=0.010000 AND
  caching='ALL' AND
  ...
Run Code Online (Sandbox Code Playgroud)

此定义以下列格式生成CassandraRow RDD元素:

CassandraRow <key, column1, value>
Run Code Online (Sandbox Code Playgroud)
  • key - RowKey
  • column1 - column1的值是动态列的名称
  • value - 动态列的值

因此,如果我有RK ='profile1',列名为''George'且age ='34',则生成的RDD将为:

CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>
Run Code Online (Sandbox Code Playgroud)

然后我需要将共享相同键的元素组合在一起以获得PairRdd:

PairRdd<String, Iterable<CassandraRow>>
Run Code Online (Sandbox Code Playgroud)

重要的是,我需要分组的所有元素都在同一个Cassandra节点(共享相同的行键),所以我希望连接器保持数据的位置.

问题是使用groupBy或groupByKey会导致混乱.我宁愿在本地对它们进行分组,因为所有数据都在同一个节点上:

JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
        .cassandraTable(ks, "Profile")
        .groupBy(new Function<ColumnFamilyModel, String>() {
            @Override
            public String call(ColumnFamilyModel arg0) throws Exception {
                return arg0.getKey();
            }
        })
Run Code Online (Sandbox Code Playgroud)

我的问题是:

  1. 在RDD上使用keyBy会导致混乱,还是会在本地保存数据?
  2. 有没有办法按键对元素进行分组而不进行改组?我读过有关mapPartitions的内容,但并不十分了解它的用法.

谢谢,

夏嘉曦

maa*_*asg 5

我认为你正在寻找spanByKey一种cassandra-connector特定的操作,它利用cassandra提供的排序,允许对元素进行分组而不会产生洗牌阶段.

在你的情况下,它应该看起来像:

sc.cassandraTable("keyspace", "Profile")
  .keyBy(row => (row.getString("key")))
  .spanByKey
Run Code Online (Sandbox Code Playgroud)

阅读更多文档:https:
//github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key