标签: spark-cassandra-connector

Spark:由于自定义字符串比较,在Filter命令期间任务不可序列化

我正在尝试使用Spark对cassandra执行查询.系统获取特定值,我想在DB中找到包含该值的其他元素.但是,该值是用户生成的,它取决于查询(即,它不是预定义的,系统每次都会收到一个新的).因此它将在方法中的某处定义为:

val id = "ID"
Run Code Online (Sandbox Code Playgroud)

我将要查询的colmn被DB视为TEXT,但实际上它是一个json输出,我必须在json字段中找到一个特定的值.因此,在spark命令中,我也在转换Json中每行的内容并遍历.

所以查询看起来像这样:

    sc.cassandraTable("keyspace","table").where("some_restriction=random")
      .filter(x=> (Json.parse(x.get[String]("content"))\"id")
      .toString.contains(id))
Run Code Online (Sandbox Code Playgroud)

这会引发错误:

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:341)
    at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:340)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.filter(RDD.scala:340)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:80)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:82)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:84)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:86)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:88)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:90)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:92)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:94)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:96)
    at $iwC$$iwC$$iwC.<init>(<console>:98)
    at $iwC$$iwC.<init>(<console>:100)
    at $iwC.<init>(<console>:102)
    at <init>(<console>:104)
    at .<init>(<console>:108) …
Run Code Online (Sandbox Code Playgroud)

scala cassandra apache-spark spark-cassandra-connector

1
推荐指数
1
解决办法
1036
查看次数

将Uber Jar提交给Google Dataproc时如何解决Guava依赖问题

我正在使用Maven Shade插件来构建Uber jar,以将其作为作业提交给google dataproc集群。Google已在其集群上安装了Apache Spark 2.0.2 Apache Hadoop 2.7.3。

Apache spark 2.0.2使用com.google.guava的14.0.1和apache hadoop 2.7.3使用11.0.2,这两个都应该已经在类路径中。

<plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                    <!--  
                        <artifactSet>
                            <includes>
                                <include>com.google.guava:guava:jar:19.0</include>
                            </includes>
                        </artifactSet>
                    -->
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.guava:guava:*</exclude>                                 
                            </excludes>
                        </artifactSet>
                    </configuration>
                </execution>
            </executions>
        </plugin>
Run Code Online (Sandbox Code Playgroud)

当我在阴影插件中包含番石榴16.0.1 jar时,我得到了这个Eexception:

Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.148.0.3}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:163)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:121)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:322)
at com.datastax.spark.connector.cql.Schema$.tableFromCassandra(Schema.scala:342)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:50)
at …
Run Code Online (Sandbox Code Playgroud)

hadoop apache-spark spark-cassandra-connector google-cloud-dataproc

1
推荐指数
1
解决办法
1511
查看次数

如何从同一Spark上下文中与不同的Cassandra群集进行交互

我想将旧的cassandra集群数据迁移到新集群,并考虑编写一些spark作业来实现。有什么方法可以与来自同一SparkContext的多个cassandra集群进行交互。这样我就可以使用同一sparkJob中的saveToCassandra函数从一个群集读取数据并写入另一个群集。

val products = sc.cassandraTable("first_cluster","products").cache()
products.saveToCassandra("diff_cluster","products2")
Run Code Online (Sandbox Code Playgroud)

我们可以将数据保存到其他群集中吗?

cassandra apache-spark spark-cassandra-connector

1
推荐指数
1
解决办法
599
查看次数

如何从Cassandra表中加载行作为Spark中的Dataframe?

我可以将整个Cassandra表加载为如下数据帧

val tableDf = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map( "table" -> table, "keyspace" -> keyspace))
      .load()
Run Code Online (Sandbox Code Playgroud)

但我找不到通过主键获取行的方法,比如

select * from table where key = ''
Run Code Online (Sandbox Code Playgroud)

有没有办法做到这一点?

scala cassandra apache-spark spark-cassandra-connector

0
推荐指数
1
解决办法
3490
查看次数

Spark Cassandra 将数据集追加到具有空值的表中

我使用DataStax Spark 连接器来填充 Cassandra 集群并处理不同作业中的数据(由于 Spark 不支持流处理的一些操作,例如双重聚合)。所以我想将不同作业的数据存储在同一个表中。假设第一个流作业在此表中插入一行(使用 foreach 编写器,因为连接器尚不支持流式写入)。

INSERT INTO keyspace_name.table_name (id, col1, col2) VALUES ("test", 1, null);
Run Code Online (Sandbox Code Playgroud)

如果我附加(更新插入)其中包含空列的数据集,而 Cassandra 中该行已经有非空值,该怎么办?

// One row of the dataset = "test", null, 2
dataset.write
  .format("org.apache.spark.sql.cassandra")
    .option("keyspace", keyspace)
  .option("table", table)
  .mode(SaveMode.Append)
  .save()
Run Code Online (Sandbox Code Playgroud)

如果我正确理解文档,以前的非空值将被新的空值覆盖?如果是这样,有没有办法保留现有的非空值?或者我是否必须将每个作业的数据存储在单独的表中?

scala insert-update cassandra apache-spark spark-cassandra-connector

0
推荐指数
1
解决办法
723
查看次数