我正在尝试使用Spark对cassandra执行查询.系统获取特定值,我想在DB中找到包含该值的其他元素.但是,该值是用户生成的,它取决于查询(即,它不是预定义的,系统每次都会收到一个新的).因此它将在方法中的某处定义为:
val id = "ID"
Run Code Online (Sandbox Code Playgroud)
我将要查询的colmn被DB视为TEXT,但实际上它是一个json输出,我必须在json字段中找到一个特定的值.因此,在spark命令中,我也在转换Json中每行的内容并遍历.
所以查询看起来像这样:
sc.cassandraTable("keyspace","table").where("some_restriction=random")
.filter(x=> (Json.parse(x.get[String]("content"))\"id")
.toString.contains(id))
Run Code Online (Sandbox Code Playgroud)
这会引发错误:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.filter(RDD.scala:340)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:80)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:82)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:84)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:86)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:88)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:90)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:92)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:94)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:96)
at $iwC$$iwC$$iwC.<init>(<console>:98)
at $iwC$$iwC.<init>(<console>:100)
at $iwC.<init>(<console>:102)
at <init>(<console>:104)
at .<init>(<console>:108) …Run Code Online (Sandbox Code Playgroud) 我正在使用Maven Shade插件来构建Uber jar,以将其作为作业提交给google dataproc集群。Google已在其集群上安装了Apache Spark 2.0.2 Apache Hadoop 2.7.3。
Apache spark 2.0.2使用com.google.guava的14.0.1和apache hadoop 2.7.3使用11.0.2,这两个都应该已经在类路径中。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!--
<artifactSet>
<includes>
<include>com.google.guava:guava:jar:19.0</include>
</includes>
</artifactSet>
-->
<artifactSet>
<excludes>
<exclude>com.google.guava:guava:*</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
Run Code Online (Sandbox Code Playgroud)
当我在阴影插件中包含番石榴16.0.1 jar时,我得到了这个Eexception:
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.148.0.3}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:163)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:322)
at com.datastax.spark.connector.cql.Schema$.tableFromCassandra(Schema.scala:342)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:50)
at …Run Code Online (Sandbox Code Playgroud) hadoop apache-spark spark-cassandra-connector google-cloud-dataproc
我想将旧的cassandra集群数据迁移到新集群,并考虑编写一些spark作业来实现。有什么方法可以与来自同一SparkContext的多个cassandra集群进行交互。这样我就可以使用同一sparkJob中的saveToCassandra函数从一个群集读取数据并写入另一个群集。
val products = sc.cassandraTable("first_cluster","products").cache()
products.saveToCassandra("diff_cluster","products2")
Run Code Online (Sandbox Code Playgroud)
我们可以将数据保存到其他群集中吗?
我可以将整个Cassandra表加载为如下数据帧
val tableDf = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> table, "keyspace" -> keyspace))
.load()
Run Code Online (Sandbox Code Playgroud)
但我找不到通过主键获取行的方法,比如
select * from table where key = ''
Run Code Online (Sandbox Code Playgroud)
有没有办法做到这一点?
我使用DataStax Spark 连接器来填充 Cassandra 集群并处理不同作业中的数据(由于 Spark 不支持流处理的一些操作,例如双重聚合)。所以我想将不同作业的数据存储在同一个表中。假设第一个流作业在此表中插入一行(使用 foreach 编写器,因为连接器尚不支持流式写入)。
INSERT INTO keyspace_name.table_name (id, col1, col2) VALUES ("test", 1, null);
Run Code Online (Sandbox Code Playgroud)
如果我附加(更新插入)其中包含空列的数据集,而 Cassandra 中该行已经有非空值,该怎么办?
// One row of the dataset = "test", null, 2
dataset.write
.format("org.apache.spark.sql.cassandra")
.option("keyspace", keyspace)
.option("table", table)
.mode(SaveMode.Append)
.save()
Run Code Online (Sandbox Code Playgroud)
如果我正确理解文档,以前的非空值将被新的空值覆盖?如果是这样,有没有办法保留现有的非空值?或者我是否必须将每个作业的数据存储在单独的表中?
scala insert-update cassandra apache-spark spark-cassandra-connector