我正在尝试使用 Apache Spark 来处理我的大型(约 230k 个条目)cassandra 数据集,但我不断遇到不同类型的错误。但是,在大约 200 个条目的数据集上运行时,我可以成功运行应用程序。我有 3 个节点的 spark 设置,其中包含 1 个主节点和 2 个工作线程,并且 2 个工作线程还安装了一个 cassandra 集群,其中数据索引的复制因子为 2。我的 2 个 spark 工作线程在 Web 界面上显示 2.4 GB 和 2.8 GB 内存,并且我spark.executor.memory在运行应用程序时设置为 2409,以获得 4.7 GB 的组合内存。这是我的 WebUI 主页
其中一项任务的环境页面
在这个阶段,我只是尝试使用 spark 处理存储在 cassandra 中的数据。这是我用来在 Java 中执行此操作的基本代码
SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", CASSANDRA_HOST)
.setJars(jars);
SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);
CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
System.out.println(rdd.count());
Run Code Online (Sandbox Code Playgroud)
为了成功运行,在一个小数据集(200 个条目)上,事件界面看起来像这样
但是当我在大型数据集上运行同样的事情时(即我只更改CASSANDRA_COLUMN_FAMILY),作业永远不会在终端内终止,日志看起来像这样 …
java cassandra apache-spark apache-spark-sql spark-cassandra-connector
我在更新键空间中的表时遇到了scala上的spark cassandra连接器问题
这是我的一段代码
val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
" SET a= a + " + b + " WHERE x=" +
x + " AND y=" + y +
" AND z=" + x
println(query)
val KeySpace = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)
hourUniqueKeySpace.sql(query)
Run Code Online (Sandbox Code Playgroud)
当我执行此代码时,我收到这样的错误
Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found
Run Code Online (Sandbox Code Playgroud)
知道为什么会这样吗?我怎样才能解决这个问题?
scala cassandra-2.0 apache-spark apache-spark-sql spark-cassandra-connector
我试图从RDD创建一个Dataframe [cassandraRow] ..但我不能因为createDataframe(RDD [Row],schema:StructType)需要RDD [Row]而不是RDD [cassandraRow].
并且根据这个问题的答案 如何将rdd对象转换为spark中的dataframe
(其中一个答案)建议在RDD [Row]上使用toDF()从RDD获取Dataframe,这对我不起作用.我尝试在另一个例子中使用RDD [Row](尝试使用toDF()).
以下是代表员工in_date和out_date的样本数据集。我必须获取所有员工的最后in_time。
Spark在4节点独立群集上运行。
初始数据集:
员工ID -----入职日期-----离职日期
1111111 2017-04-20 2017-09-14
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2016-01-07 2016-01-20
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
之后的数据集df.sort(col(in_date).desc()):
员工编号-in_date ----- out_date
1111111 2017-11-02 null
1111111 2017-04-20 2017-09-14
2222222 2017-09-26 2017-09-26
2222222 2017-11-28 null
3333333 2017-10-25 null
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
df.dropDup(EmployeeID):
Run Code Online (Sandbox Code Playgroud)
输出:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-09-26 2017-09-26
3333333 2016-01-07 2016-01-20
Run Code Online (Sandbox Code Playgroud)
预期数据集:
员工ID -----入职日期-----离职日期
1111111 2017-11-02 null
2222222 2017-11-28 null
3333333 2017-10-25 null
Run Code Online (Sandbox Code Playgroud)
但是,当我使用进行初始数据集排序sortWithInPartitions并进行重复数据删除时,我得到了预期的数据集。我在这里错过了大大小小的东西吗?任何帮助表示赞赏。
附加信息:
当在本地模式下用Spark执行df.sort时,实现了上述预期输出。
我没有做任何分区,重新分区。初始数据集是从基础Cassandra数据库获得的。
apache-spark apache-spark-sql spark-cassandra-connector apache-spark-dataset
我在使用针对Java的Spark cassandra连接器筛选时遇到了一些问题。Cassandra允许使用IN子句按分区键的最后一列进行过滤。例如
create table cf_text
(a varchar,b varchar,c varchar, primary key((a,b),c))
Query : select * from cf_text where a ='asdf' and b in ('af','sd');
sc.cassandraTable("test", "cf_text").where("a = ?", "af").toArray.foreach(println)
Run Code Online (Sandbox Code Playgroud)
我如何指定在spark的CQL查询中使用的IN子句?如何也可以指定范围查询?
我正在使用Spark Cassandra连接器.从Cassandra表获取数据需要5-6分钟.在Spark中,我在日志中看到了许多任务和Executor.原因可能是Spark在许多任务中划分了这个过程!
下面是我的代码示例:
public static void main(String[] args) {
SparkConf conf = new SparkConf(true).setMaster("local[4]")
.setAppName("App_Name")
.set("spark.cassandra.connection.host", "127.0.0.1");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Demo_Bean> empRDD = javaFunctions(sc).cassandraTable("dev",
"demo");
System.out.println("Row Count"+empRDD.count());
}
Run Code Online (Sandbox Code Playgroud) 我正在寻找一个RDD和一个cassandra表的连接,它们对于相同的密钥ex(简化)具有不同的名称:
case class User(id : String, name : String)
Run Code Online (Sandbox Code Playgroud)
和
case class Home( address : String, user_id : String)
Run Code Online (Sandbox Code Playgroud)
如果想做:
rdd[Home].joinWithCassandraTable("testspark","user").on(SomeColumns("id"))
Run Code Online (Sandbox Code Playgroud)
如何确定要进行连接的字段的名称.而且我不想将rdd映射到只有正确的id,因为我想在joinWithCassandraTable之后加入所有值.
scala cassandra datastax-enterprise apache-spark spark-cassandra-connector
有没有办法在实例化时注册自定义编解码器CassandraConnector?
我每次打电话时都在注册我的编解码器 cassandraConnector.withSessionDo
val cassandraConnector = CassandraConnector(ssc.sparkContext.getConf)
...
...
.mapPartitions(partition => {
cassandraConnector.withSessionDo(session => {
// register custom codecs once for each partition so it isn't loaded as often for each data point
if (partition.nonEmpty) {
session.getCluster.getConfiguration.getCodecRegistry
.register(new TimestampLongCodec)
.register(new SummaryStatsBlobCodec)
.register(new JavaHistogramBlobCodec)
}
Run Code Online (Sandbox Code Playgroud)
这样做似乎有点像反模式.它也真的堵塞了我们的日志,因为我们有一个每30秒运行一次的火花流服务,并且它正在填充我们的日志:
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec SummaryStatsBlobCodec [blob <-> SummaryStats] because it collides with previously registered codec SummaryStatsBlobCodec [blob <-> SummaryStats]
16/11/01 14:14:44 WARN CodecRegistry: Ignoring codec JavaHistogramBlobCodec [blob <-> Histogram] because it …Run Code Online (Sandbox Code Playgroud) scala cassandra apache-spark spark-streaming spark-cassandra-connector
我正在尝试使用 spark 来处理大型 cassandra 表(约 4.02 亿个条目和 84 列),但得到的结果不一致。最初的要求是将一些列从这个表复制到另一个表。复制数据后,我注意到新表中的某些条目丢失了。为了验证我是否对大型源表进行了计数,但每次都得到不同的值。我在一个较小的表(约 700 万条记录)上尝试了查询,结果很好。
最初,我尝试使用 pyspark 进行计数。这是我的 pyspark 脚本:
spark = SparkSession.builder.appName("Datacopy App").getOrCreate()
df = spark.read.format("org.apache.spark.sql.cassandra").options(table=sourcetable, keyspace=sourcekeyspace).load().cache()
df.createOrReplaceTempView("data")
query = ("select count(1) from data " )
vgDF = spark.sql(query)
vgDF.show(10)
Run Code Online (Sandbox Code Playgroud)
Spark提交命令如下:
~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master spark://10.128.0.18:7077 --packages datastax:spark-cassandra-connector:2.0.1-s_2.11 --conf spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/ --executor-memory 10G --num-executors=6 --executor-cores=2 --total-executor-cores 18 pyspark_script.py
Run Code Online (Sandbox Code Playgroud)
上述火花提交过程需要大约 90 分钟才能完成。我跑了三遍,这是我得到的计数:
Spark 在整个过程中没有显示任何错误或异常。我在 cqlsh 中运行了三次相同的查询,再次得到不同的结果:
我正在使用 Scala 并尝试将我的日历信息从 Spark 保存到 Cassandra。
我开始使用 Cassandra 创建相同的架构:
session.execute("CREATE TABLE calendar (DateNum int, Date text, YearMonthNum int, ..., PRIMARY KEY (datenum,date))")
然后将我的数据从 Spark 导入到 Cassandra:
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "calendar", "keyspace" -> "ks"))
.mode(SaveMode.Append)
.save()
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试读取从 Cassandra 上的 Spark 检索的数据时,行看起来非常混乱,而我想保持日历的顺序相同。
我有一个行的例子:
20090111 | 2009 年 1 月 11 日 | 200901 |...
选择/订购似乎也不能解决问题。