我有一个Cassandra表,为简单起见,看起来像:
key: text
jsonData: text
blobData: blob
Run Code Online (Sandbox Code Playgroud)
我可以使用spark和spark-cassandra-connector为此创建一个基本数据框:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
Run Code Online (Sandbox Code Playgroud)
我正在努力将JSON数据扩展到其底层结构中.我最终希望能够根据json字符串中的属性进行过滤并返回blob数据.像jsonData.foo ="bar"之类的东西并返回blobData.这目前可能吗?
scala dataframe apache-spark apache-spark-sql spark-cassandra-connector
cassandra数据库中有许多表,其中包含标题为user_id的列.值user_id被引用到存储在表用户中的用户.由于某些用户被删除,我想删除包含标题为user_id的列的所有表中的孤立记录.
有没有办法使用CassandraSQLContext或任何其他内置方法或自定义过程列出所有表,以避免显式定义表列表?
谢谢.
我在Spark中有以下代码:
rdd
.map(processFunction(_))
.saveToCassandra("keyspace", "tableName")
Run Code Online (Sandbox Code Playgroud)
哪里
def processFunction(src: String): Seq[Any] =
src match {
case "a" => List(A("a", 123112, "b"), A("b", 142342, "c"))
case "b" => List(B("d", 12312, "e", "f"), B("g", 12312, "h", "i"))
}
Run Code Online (Sandbox Code Playgroud)
哪里:
case class A(entity: String, time: Long, value: String)
case class B(entity: String, time: Long, value1: String, value2: String)
Run Code Online (Sandbox Code Playgroud)
saveToCassandra期望一组对象并使用Seq[Any]作为返回类型来包含两者Seq[A]和具有异常的Seq[B]中断saveToCassandra- scala.ScalaReflectionException: <none>不是一个术语.这种行为可能是什么原因?
我试图通过数据采集器上的spark-shell使用spark-cassandra-connector,但是我无法连接到我的集群.似乎版本不匹配,因为类路径包含来自其他地方的更古老的番石榴版本,即使我在启动时指定了正确的版本.我怀疑这可能是由默认情况下放入类路径的所有Hadoop依赖项引起的.
反正有没有火花壳只使用适当版本的番石榴,而没有摆脱所有与Hadoop相关的数据包包括罐子?
相关数据:
启动spark-shell,显示它具有适当版本的Guava: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3
:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central
found org.apache.cassandra#cassandra-clientutil;2.2.2 in central
found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central
found io.netty#netty-handler;4.0.27.Final in central
found io.netty#netty-buffer;4.0.27.Final in central
found io.netty#netty-common;4.0.27.Final in central
found io.netty#netty-transport;4.0.27.Final in central
found io.netty#netty-codec;4.0.27.Final in central
found com.codahale.metrics#metrics-core;3.0.2 in central
found org.slf4j#slf4j-api;1.7.5 in central
found org.apache.commons#commons-lang3;3.3.2 in central
found com.google.guava#guava;16.0.1 in central
found org.joda#joda-convert;1.2 in central
found …Run Code Online (Sandbox Code Playgroud) apache-spark spark-cassandra-connector google-cloud-dataproc
如何在任务或作业完成后立即在控制台(Spark Shell或Spark提交作业)上收集这些指标.
我们使用Spark将数据从Mysql加载到Cassandra并且它非常庞大(例如:~200 GB和600M行).当任务完成后,我们想验证火花过程究竟完成了多少行?我们可以从Spark UI获取数字,但是如何从spark shell或spark-submit作业中检索该数字("Output Records Written").
示例命令从Mysql加载到Cassandra.
val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()
pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
Run Code Online (Sandbox Code Playgroud)
我想在上面的任务中检索所有Spark UI指标,主要是输出大小和记录写入.
请帮忙.
谢谢你的时间!
apache-spark codahale-metrics apache-spark-sql spark-cassandra-connector spark-dataframe
确切的例外情况如下
com.datastax.driver.core.exceptions.CodecNotFoundException:找不到请求的操作的编解码器:[varchar < - > java.math.BigDecimal]
这些是我使用Spark 1.5 Datastax-cassandra 3.2.1 CDH 5.5.1的软件版本
我试图执行的代码是使用java api的Spark程序,它基本上从hdfs读取数据(csv)并将其加载到cassandra表中.我正在使用spark-cassandra-connector.最初我有很多关于google s guava库冲突的问题,我可以通过对guava库进行着色并构建一个包含所有依赖项的快照jar来解决这个问题.
但是我能够为某些文件加载数据但是对于某些文件我得到了Codec异常.当我研究这个问题时,我在同一个问题上得到了以下线程.
https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk
https://groups.google.com/a/lists.datastax.com/forum/#!topic/java-driver-user/yZyaOQ-wazk
经过这些讨论之后,我理解的是它是我正在使用的cassandra-driver的错误版本.或者仍然存在与番石榴库相关的类路径问题,因为cassandra 3.0及更高版本使用guava 16.0.1,上面的讨论表明类路径中可能存在较低版本的番石榴.
这是pom.xml文件
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.5.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-clientutil</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>com.pointcross.shaded.google</shadedPattern>
</relocation>
</relocations>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
</configuration>
</execution>
</executions> …Run Code Online (Sandbox Code Playgroud) guava datastax-java-driver datastax cloudera-cdh spark-cassandra-connector
这个错误是最难追查的.我不确定发生了什么.我正在我的位置机器上运行Spark集群.所以整个火花集群都在一个主机下127.0.0.1,我在独立模式下运行
JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
.select("rowkey", "col1", "col2", "col3", )
.spanBy(new Function<CassandraRow, byte[]>() {
@Override
public byte[] call(CassandraRow v1) {
return v1.getBytes("rowkey").array();
}
}, byte[].class);
Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
System.out.println("************START************");
System.out.println(new String(partitionKey));
System.out.println("************END************");
}
Run Code Online (Sandbox Code Playgroud)
这个错误是最难追查的.它显然发生在cassandraRowsRDD.collect(),我不知道为什么?
16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ …Run Code Online (Sandbox Code Playgroud) 我总是收到以下错误.有人可以帮我吗?
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.datastax.spark.connector.japi.DStreamJavaFunctions.<init>(DStreamJavaFunctions.java:24)
at com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.javaFunctions(CassandraStreamingJavaUtil.java:55)
at SparkStream.main(SparkStream.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 20 more
Run Code Online (Sandbox Code Playgroud)
当我编译以下代码时.我在网上搜索过但没有找到解决方案.我添加了saveToCassandra时出错了.
import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext; …Run Code Online (Sandbox Code Playgroud) 所以我有一个Python Stream-sourced DataFrame df,它包含我想要放入带有spark-cassandra-connector的Cassandra表的所有数据.我试过两种方式:
df.write \
.format("org.apache.spark.sql.cassandra") \
.mode('append') \
.options(table="myTable",keyspace="myKeySpace") \
.save()
query = df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode('append') \
.options(table="myTable",keyspace="myKeySpace") \
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是我继续分别得到这个错误:
pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;
Run Code Online (Sandbox Code Playgroud)
和
java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.
Run Code Online (Sandbox Code Playgroud)
无论如何我可以将我的Streaming DataFrame发送到我的Cassandra表中吗?
apache-spark pyspark spark-cassandra-connector spark-structured-streaming
首先,我没有使用DSE Cassandra.我正在构建这个并使用Microsoft Azure来托管服务器.
我有一个2节点的Cassandra集群,我已经设法在单个节点上设置Spark,但我找不到任何关于在多节点集群上设置它的在线资源.
这不是如何设置spark Cassandra多节点集群的重复?
要在单个节点上进行设置,我已经按照本教程" 使用Cassandra Connector设置Spark "进行了操作.
apache-spark ×9
cassandra ×3
scala ×3
java ×2
cloudera-cdh ×1
dataframe ×1
datastax ×1
guava ×1
maven ×1
pyspark ×1