标签: spark-cassandra-connector

如何使用Spark DataFrames查询JSON数据列?

我有一个Cassandra表,为简单起见,看起来像:

key: text
jsonData: text
blobData: blob
Run Code Online (Sandbox Code Playgroud)

我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()
Run Code Online (Sandbox Code Playgroud)

我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?

scala dataframe apache-spark apache-spark-sql spark-cassandra-connector

37
推荐指数
2
解决办法
4万
查看次数

如何列出所有cassandra表

cassandra数据库中有许多表,其中包含标题为user_id的列.值user_id被引用到存储在表用户中的用户.由于某些用户被删除,我想删除包含标题为user_id的列的所有表中的孤立记录.

有没有办法使用CassandraSQLContext或任何其他内置方法或自定义过程列出所有表,以避免显式定义表列表?

谢谢.

scala cassandra apache-spark spark-cassandra-connector

18
推荐指数
5
解决办法
3万
查看次数

scala.ScalaReflectionException:<none>不是一个术语

我在Spark中有以下代码:

rdd
  .map(processFunction(_))
  .saveToCassandra("keyspace", "tableName")
Run Code Online (Sandbox Code Playgroud)

哪里

def processFunction(src: String): Seq[Any] =
  src match {
   case "a" => List(A("a", 123112, "b"), A("b", 142342, "c"))
   case "b" => List(B("d", 12312, "e", "f"), B("g", 12312, "h", "i"))
  }
Run Code Online (Sandbox Code Playgroud)

哪里:

case class A(entity: String, time: Long, value: String)
case class B(entity: String, time: Long, value1: String, value2: String)
Run Code Online (Sandbox Code Playgroud)

saveToCassandra期望一组对象并使用Seq[Any]作为返回类型来包含两者Seq[A]和具有异常的Seq[B]中断saveToCassandra- scala.ScalaReflectionException: <none>不是一个术语.这种行为可能是什么原因?

scala apache-spark spark-cassandra-connector

10
推荐指数
1
解决办法
3051
查看次数

番石榴版同时使用火花壳

我试图通过数据采集器上的spark-shell使用spark-cassandra-connector,但是我无法连接到我的集群.似乎版本不匹配,因为类路径包含来自其他地方的更古老的番石榴版本,即使我在启动时指定了正确的版本.我怀疑这可能是由默认情况下放入类路径的所有Hadoop依赖项引起的.

反正有没有火花壳只使用适当版本的番石榴,而没有摆脱所有与Hadoop相关的数据包包括罐子?

相关数据:

启动spark-shell,显示它具有适当版本的Guava: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3

:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central
        found org.apache.cassandra#cassandra-clientutil;2.2.2 in central
        found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central
        found io.netty#netty-handler;4.0.27.Final in central
        found io.netty#netty-buffer;4.0.27.Final in central
        found io.netty#netty-common;4.0.27.Final in central
        found io.netty#netty-transport;4.0.27.Final in central
        found io.netty#netty-codec;4.0.27.Final in central
        found com.codahale.metrics#metrics-core;3.0.2 in central
        found org.slf4j#slf4j-api;1.7.5 in central
        found org.apache.commons#commons-lang3;3.3.2 in central
        found com.google.guava#guava;16.0.1 in central
        found org.joda#joda-convert;1.2 in central
        found …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-cassandra-connector google-cloud-dataproc

10
推荐指数
1
解决办法
4668
查看次数

如何检索从Spark UI写入的输出大小和记录等指标?

如何在任务或作业完成后立即在控制台(Spark Shell或Spark提交作业)上收集这些指标.

我们使用Spark将数据从Mysql加载到Cassandra并且它非常庞大(例如:~200 GB和600M行).当任务完成后,我们想验证火花过程究竟完成了多少行?我们可以从Spark UI获取数字,但是如何从spark shell或spark-submit作业中检索该数字("Output Records Written").

示例命令从Mysql加载到Cassandra.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
Run Code Online (Sandbox Code Playgroud)

我想在上面的任务中检索所有Spark UI指标,主要是输出大小和记录写入.

请帮忙.

谢谢你的时间!

apache-spark codahale-metrics apache-spark-sql spark-cassandra-connector spark-dataframe

10
推荐指数
1
解决办法
2200
查看次数

Datastax Cassandra Driver抛出CodecNotFoundException

确切的例外情况如下

com.datastax.driver.core.exceptions.CodecNotFoundException:找不到请求的操作的编解码器:[varchar < - > java.math.BigDecimal]

这些是我使用Spark 1.5 Datastax-cassandra 3.2.1 CDH 5.5.1的软件版本

我试图执行的代码是使用java api的Spark程序,它基本上从hdfs读取数据(csv)并将其加载到cassandra表中.我正在使用spark-cassandra-connector.最初我有很多关于google s guava库冲突的问题,我可以通过对guava库进行着色并构建一个包含所有依赖项的快照jar来解决这个问题.

但是我能够为某些文件加载​​数据但是对于某些文件我得到了Codec异常.当我研究这个问题时,我在同一个问题上得到了以下线程.

https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk

https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk

经过这些讨论之后,我理解的是它是我正在使用的cassandra-driver的错误版本.或者仍然存在与番石榴库相关的类路径问题,因为cassandra 3.0及更高版本使用guava 16.0.1,上面的讨论表明类路径中可能存在较低版本的番石榴.

这是pom.xml文件

 <dependencies>
 <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version> 
</dependency>
<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>3.8.1</version>
  <scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.5.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-clientutil</artifactId>
<version>3.2.1</version>
</dependency>

</dependencies>
  <build>
<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.3</version>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                 <filters>
    <filter>
        <artifact>*:*</artifact>
        <excludes>
            <exclude>META-INF/*.SF</exclude>
            <exclude>META-INF/*.DSA</exclude>
            <exclude>META-INF/*.RSA</exclude>
        </excludes>
    </filter>
</filters>
                    <relocations>
                        <relocation>
                            <pattern>com.google</pattern>
                            <shadedPattern>com.pointcross.shaded.google</shadedPattern>
                        </relocation>

                    </relocations>
                    <minimizeJar>false</minimizeJar>
                    <shadedArtifactAttached>true</shadedArtifactAttached>
                </configuration>
            </execution>
        </executions> …
Run Code Online (Sandbox Code Playgroud)

guava datastax-java-driver datastax cloudera-cdh spark-cassandra-connector

9
推荐指数
1
解决办法
2万
查看次数

如何修复java.lang.ClassCastException:无法将scala.collection.immutable.List的实例分配给字段类型scala.collection.Seq?

这个错误是最难追查的.我不确定发生了什么.我正在我的位置机器上运行Spark集群.所以整个火花集群都在一个主机下127.0.0.1,我在独立模式下运行

JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
   .select("rowkey", "col1", "col2", "col3",  )
   .spanBy(new Function<CassandraRow, byte[]>() {
        @Override
        public byte[] call(CassandraRow v1) {
            return v1.getBytes("rowkey").array();
        }
    }, byte[].class);

Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
    System.out.println("************START************");
    System.out.println(new String(partitionKey));
    System.out.println("************END************");
}
Run Code Online (Sandbox Code Playgroud)

这个错误是最难追查的.它显然发生在cassandraRowsRDD.collect(),我不知道为什么?

16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ …
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-cassandra-connector

9
推荐指数
3
解决办法
1万
查看次数

java.lang.NoClassDefFoundError:org/apache/spark/Logging

我总是收到以下错误.有人可以帮我吗?

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at com.datastax.spark.connector.japi.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:24)
    at com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions(CassandraStreamingJavaUtil.java:55)
    at SparkStream.main(SparkStream.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 20 more
Run Code Online (Sandbox Code Playgroud)

当我编译以下代码时.我在网上搜索过但没有找到解决方案.我添加了saveToCassandra时出错了.

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext; …
Run Code Online (Sandbox Code Playgroud)

java cassandra maven apache-spark spark-cassandra-connector

9
推荐指数
3
解决办法
3万
查看次数

如何将数据集写入Cassandra?

所以我有一个Python Stream-sourced DataFrame df,它包含我想要放入带有spark-cassandra-connector的Cassandra表的所有数据.我试过两种方式:

df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .save() 

query = df.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .outputMode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

但是我继续分别得到这个错误:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;
Run Code Online (Sandbox Code Playgroud)

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.
Run Code Online (Sandbox Code Playgroud)

无论如何我可以将我的Streaming DataFrame发送到我的Cassandra表中吗?

apache-spark pyspark spark-cassandra-connector spark-structured-streaming

8
推荐指数
2
解决办法
1768
查看次数

如何使用多节点Cassandra集群设置Spark?

首先,我没有使用DSE Cassandra.我正在构建这个并使用Microsoft Azure来托管服务器.

我有一个2节点的Cassandra集群,我已经设法在单个节点上设置Spark,但我找不到任何关于在多节点集群上设置它的在线资源.

这不是如何设置spark Cassandra多节点集群的重复

要在单个节点上进行设置,我已经按照本教程" 使用Cassandra Connector设置Spark "进行了操作.

cassandra apache-spark spark-cassandra-connector

8
推荐指数
1
解决办法
855
查看次数