标签: spark-cassandra-connector

Apache Spark 无法处理大型 Cassandra 列族

我正在尝试使用 Apache Spark 来处理我的大型(约 230k 个条目)cassandra 数据集,但我不断遇到不同类型的错误。但是,在大约 200 个条目的数据集上运行时,我可以成功运行应用程序。我有 3 个节点的 spark 设置,其中包含 1 个主节点和 2 个工作线程,并且 2 个工作线程还安装了一个 cassandra 集群,其中数据索引的复制因子为 2。我的 2 个 spark 工作线程在 Web 界面上显示 2.4 GB 和 2.8 GB 内存,并且我spark.executor.memory在运行应用程序时设置为 2409,以获得 4.7 GB 的组合内存。这是我的 WebUI 主页

在此处输入图片说明

其中一项任务的环境页面

环境

在这个阶段,我只是尝试使用 spark 处理存储在 cassandra 中的数据。这是我用来在 Java 中执行此操作的基本代码

SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);

CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);

System.out.println(rdd.count());
Run Code Online (Sandbox Code Playgroud)

为了成功运行,在一个小数据集(200 个条目)上,事件界面看起来像这样

在此处输入图片说明

但是当我在大型数据集上运行同样的事情时(即我只更改CASSANDRA_COLUMN_FAMILY),作业永远不会在终端内终止,日志看起来像这样 …

java cassandra apache-spark apache-spark-sql spark-cassandra-connector

4
推荐指数
1
解决办法
2351
查看次数

使用spark cassandra连接器更新Cassandra表

我在更新键空间中的表时遇到了scala上的spark cassandra连接器问题

这是我的一段代码

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)
Run Code Online (Sandbox Code Playgroud)

当我执行此代码时,我收到这样的错误

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found
Run Code Online (Sandbox Code Playgroud)

知道为什么会这样吗?我怎样才能解决这个问题?

scala cassandra-2.0 apache-spark apache-spark-sql spark-cassandra-connector

4
推荐指数
2
解决办法
6478
查看次数

如何将cassandraRow转换为Row(apache spark)?

我试图从RDD创建一个Dataframe [cassandraRow] ..但我不能因为createDataframe(RDD [Row],schema:StructType)需要RDD [Row]而不是RDD [cassandraRow].

  • 我怎样才能做到这一点?

并且根据这个问题的答案 如何将rdd对象转换为spark中的dataframe

(其中一个答案)建议在RDD [Row]上使用toDF()从RDD获取Dataframe,这对我不起作用.我尝试在另一个例子中使用RDD [Row](尝试使用toDF()).

  • 对于我来说,我们怎么能用RDD(RDD [Row])的实例调用Dataframe(toDF())的方法呢?

我正在使用Scala. 在此输入图像描述

cassandra apache-spark spark-cassandra-connector

4
推荐指数
1
解决办法
4058
查看次数

Spark-sortWithInPartitions超过排序

以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。

Spark在4节点独立群集上运行。

初始数据集:

员工ID -----入职日期-----离职日期

1111111     2017-04-20  2017-09-14 
1111111     2017-11-02  null 
2222222     2017-09-26  2017-09-26 
2222222     2017-11-28  null 
3333333     2016-01-07  2016-01-20 
3333333     2017-10-25  null 
Run Code Online (Sandbox Code Playgroud)

之后的数据集df.sort(col(in_date).desc())

员工编号-in_date ----- out_date

1111111   2017-11-02   null 
1111111   2017-04-20   2017-09-14 
2222222   2017-09-26   2017-09-26 
2222222   2017-11-28   null 
3333333   2017-10-25   null 
3333333   2016-01-07   2016-01-20 
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):  
Run Code Online (Sandbox Code Playgroud)

输出

员工ID -----入职日期-----离职日期

1111111    2017-11-02    null 
2222222    2017-09-26    2017-09-26 
3333333    2016-01-07    2016-01-20 
Run Code Online (Sandbox Code Playgroud)

预期数据集:

员工ID -----入职日期-----离职日期

1111111    2017-11-02   null 
2222222    2017-11-28   null 
3333333    2017-10-25   null 
Run Code Online (Sandbox Code Playgroud)

但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。

附加信息: 当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。

apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset

4
推荐指数
1
解决办法
3399
查看次数

使用IN子句过滤Spark Cassandra连接器

我在使用针对Java的Spark cassandra连接器筛选时遇到了一些问题。Cassandra允许使用IN子句按分区键的最后一列进行过滤。例如

create table cf_text
(a varchar,b varchar,c varchar, primary key((a,b),c))

Query : select * from cf_text where a ='asdf' and b in ('af','sd');

sc.cassandraTable("test", "cf_text").where("a = ?", "af").toArray.foreach(println)
Run Code Online (Sandbox Code Playgroud)

我如何指定在spark的CQL查询中使用的IN子句?如何也可以指定范围查询?

java cql cassandra apache-spark spark-cassandra-connector

3
推荐指数
1
解决办法
3348
查看次数

Apache Spark需要5到6分钟才能从Cassandra中简单计算1个亿行

我正在使用Spark Cassandra连接器.从Cassandra表获取数据需要5-6分钟.在Spark中,我在日志中看到了许多任务和Executor.原因可能是Spark在许多任务中划分了这个过程!

下面是我的代码示例:

public static void main(String[] args) {

    SparkConf conf = new SparkConf(true).setMaster("local[4]")
            .setAppName("App_Name")
            .set("spark.cassandra.connection.host", "127.0.0.1");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<Demo_Bean> empRDD = javaFunctions(sc).cassandraTable("dev",
            "demo");
    System.out.println("Row Count"+empRDD.count());
}
Run Code Online (Sandbox Code Playgroud)

java cassandra apache-spark spark-cassandra-connector

3
推荐指数
2
解决办法
2860
查看次数

Cassandra火花连接器joinWithCassandraTable在具有不同名称的字段上

我正在寻找一个RDD和一个cassandra表的连接,它们对于相同的密钥ex(简化)具有不同的名称:

case class User(id : String, name : String)
Run Code Online (Sandbox Code Playgroud)

case class Home( address : String, user_id : String)
Run Code Online (Sandbox Code Playgroud)

如果想做:

rdd[Home].joinWithCassandraTable("testspark","user").on(SomeColumns("id"))
Run Code Online (Sandbox Code Playgroud)

如何确定要进行连接的字段的名称.而且我不想将rdd映射到只有正确的id,因为我想在joinWithCassandraTable之后加入所有值.

scala cassandra datastax-enterprise apache-spark spark-cassandra-connector

3
推荐指数
1
解决办法
3804
查看次数

将自定义编解码器添加到CassandraConnector

有没有办法在实例化时注册自定义编解码器CassandraConnector

我每次打电话时都在注册我的编解码器 cassandraConnector.withSessionDo

val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
...
...
.mapPartitions(partition => {
  cassandraConnector.withSessionDo(session => {
    // register custom codecs once for each partition so it isn't loaded as often for each data point
    if (partition.nonEmpty) {
      session.getCluster.getConfiguration.getCodecRegistry
        .register(new TimestampLongCodec)
        .register(new SummaryStatsBlobCodec)
        .register(new JavaHistogramBlobCodec)
    }
Run Code Online (Sandbox Code Playgroud)

这样做似乎有点像反模式.它也真的堵塞了我们的日志,因为我们有一个每30秒运行一次的火花流服务,并且它正在填充我们的日志:

16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec SummaryStatsBlobCodec [blob <-> SummaryStats] because it collides with previously registered codec SummaryStatsBlobCodec [blob <-> SummaryStats]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec JavaHistogramBlobCodec [blob <-> Histogram] because it …
Run Code Online (Sandbox Code Playgroud)

scala cassandra apache-spark spark-streaming spark-cassandra-connector

3
推荐指数
1
解决办法
902
查看次数

Cassandra/Spark 显示不正确的大表条目数

我正在尝试使用 spark 来处理大型 cassandra 表(约 4.02 亿个条目和 84 列),但得到的结果不一致。最初的要求是将一些列从这个表复制到另一个表。复制数据后,我注意到新表中的某些条目丢失了。为了验证我是否对大型源表进行了计数,但每次都得到不同的值。我在一个较小的表(约 700 万条记录)上尝试了查询,结果很好。

最初,我尝试使用 pyspark 进行计数。这是我的 pyspark 脚本:

spark = SparkSession.builder.appName("Datacopy App").getOrCreate() 
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache() 
df.createOrReplaceTempView("data") 
query = ("select count(1) from data " ) 
vgDF = spark.sql(query) 
vgDF.show(10)
Run Code Online (Sandbox Code Playgroud)

Spark提交命令如下:

~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
Run Code Online (Sandbox Code Playgroud)

上述火花提交过程需要大约 90 分钟才能完成。我跑了三遍,这是我得到的计数:

  • Spark 迭代 1:402273852
  • Spark 迭代 2:402273884
  • Spark 迭代 3:402274209

Spark 在整个过程中没有显示任何错误或异常。我在 cqlsh 中运行了三次相同的查询,再次得到不同的结果:

  • Cqlsh 迭代 1:402273598
  • Cqlsh 迭代 2:402273499
  • Cqlsh 迭代 …

cassandra apache-spark pyspark spark-cassandra-connector

3
推荐指数
1
解决办法
1480
查看次数

将信息保存到 Cassandra 不会保持顺序

我正在使用 Scala 并尝试将我的日历信息从 Spark 保存到 Cassandra。

我开始使用 Cassandra 创建相同的架构:

session.execute("CREATE TABLE calendar (DateNum int, Date text, YearMonthNum int, ..., PRIMARY KEY (datenum,date))")

然后将我的数据从 Spark 导入到 Cassandra:

        .write
        .format("org.apache.spark.sql.cassandra")
        .options(Map("table" -> "calendar", "keyspace" -> "ks"))
        .mode(SaveMode.Append)
        .save()
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试读取从 Cassandra 上的 Spark 检索的数据时,行看起来非常混乱,而我想保持日历的顺序相同。

我有一个行的例子:

20090111 | 2009 年 1 月 11 日 | 200901 |...

选择/订购似乎也不能解决问题。

scala cassandra apache-spark spark-cassandra-connector

2
推荐指数
1
解决办法
73
查看次数