我有一个流处理过程,该过程从Kafka读取数据,使用Spark处理数据并将数据写入Cassandra。
这将在具有3-5个节点的群集上运行。我的计划是在集群的每个节点上部署spark,kafka和cassandra。
我想尽可能地强制执行数据局部性,这意味着每个Spark节点仅从该节点上的 Kafka读取数据,在本地进行处理(我的管道中没有改组转换),并在其中写入Cassandra该节点。
因此,我的问题如下:
1)为了将同一主题存储在多个节点上,是否需要对Kafka主题进行分区?
2)我是否需要同步(设置为相同)Kafka分区程序和Cassandra分区程序,以便确保在节点X上到达Kafka分区的数据一定会存储在同一节点的Cassandra中?
3)在Spark管道中还有其他需要特别注意的事情吗?我正在使用Spark-Cassandra连接器,该连接器应利用数据局部性(以便每个Spark任务读取存储在该特定节点上的数据)。
任何博客文章或文章解释了如何做到这一点都倍受赞赏。
问候,
Srdjan
cassandra apache-kafka apache-spark spark-streaming spark-cassandra-connector
我试图找出如何在where子句下设置blob列.任何的想法?
例如,如果我在cqlsh中放入以下查询,它就可以工作
select * from hello where id=0xc1c1795a0b;
Run Code Online (Sandbox Code Playgroud)
// id是cassandra中的blob列
我尝试了以下内容
JavaRDD<CassandraRow> cassandraRowsRDD = javaFunctions(sc).cassandraTable("test", "hello")
.select("range" )
.where("id=?", "0xc1c1795a0b");
Run Code Online (Sandbox Code Playgroud)
这给了我一个类型转换器例外
我试过这个
JavaRDD<CassandraRow> cassandraRowsRDD = javaFunctions(sc).cassandraTable("test", "hello")
.select("range" )
.where("id=?", "0xc1c1795a0b".getBytes());
Run Code Online (Sandbox Code Playgroud)
这没有给我任何错误,但它没有返回任何结果.我的cqlsh中的查询确实返回了一堆结果.所以我不确定在where子句中设置一个blob.我正在使用Java.有任何想法吗?
我正在使用Scala版本2.10.5 Cassandra 3.0和Spark 1.6.我想将数据插入cassandra所以我尝试了基本的例子
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
Run Code Online (Sandbox Code Playgroud)
哪些Works和能够将数据插入到Cassandra中.所以我有一个csv文件,我想通过匹配模式插入到Cassandra表中
val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
Run Code Online (Sandbox Code Playgroud)
当我使用SaveToCassndra时,我得到saveToCassandra不是personSchemaRDD的一部分.所以教会以不同的方式尝试
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
Run Code Online (Sandbox Code Playgroud)
但是无法连接到ip上的cassandra:port.can任何人告诉我最好的方法.我需要定期从文件中将数据保存到cassandra.
我的问题是 cassandra 在插入 NULL 值时创建墓碑。
据我了解,cassandra 不支持 NULL,当插入 NULL 时,它只会删除相应的列。一方面,这非常节省空间,但另一方面,它会创建墓碑,从而降低读取性能。
这违背了 NoSql 哲学,因为 cassandra 节省了空间,但降低了读取性能。在 NoSql 世界中,空间很便宜,但性能很重要。我相信这是以非规范化形式保存表背后的哲学。
我希望 cassandra 使用与插入任何其他值相同的技术来插入 NULL - 使用时间戳并在压缩期间保留最新条目 - 即使该条目为 NULL (或者我们可以称之为“未设置”)。cassandra 配置中是否有任何调整或任何方法如何能够在没有墓碑的情况下实现带有空值的更新插入?
我遇到了这个问题,但它只允许忽略 NULL 值
我的用例:我有事件流,每个事件都由 CauseID 标识。我收到许多具有相同 CauseId 的事件,并且我只想存储相同 CauseID 的最新事件(使用 upsert)。事件的属性可能从 NULL 更改为特定值,也可能从特定值更改为 NULL。不幸的是,后一种情况会生成逻辑删除并降低读取性能。
更新
看来我没有办法避开墓碑了。您能否建议我如何最小化它们的技术(将 gc_grace_seconds 设置为非常低的值)。有哪些风险?当节点宕机时间超过 gc_grace_seconds 时该怎么办?
我正在尝试使用 spark cassandra 连接器从表中获取值。load() 产生 IncompatibleClassChangeError。我的开发环境定义如下,
Intellij:2019.3.2
Scala 版本:2.12.10
Scala 二进制版本:2.12
Spark 版本:2.4.4
Spark cassandra 连接器:2.4.2
Exception in thread "main" java.lang.IncompatibleClassChangeError: Inconsistent constant pool data in classfile for class com/datastax/spark/connector/cql/StructDef. Method 'java.lang.Object $anonfun$missingColumns$1$adapted(com.datastax.spark.connector.cql.StructDef, com.datastax.spark.connector.ColumnRef)' at index 182 is CONSTANT_MethodRef and should be CONSTANT_InterfaceMethodRef
哪些库会产生异常?
我正在尝试从 Spark 中名为 logtimestamp (类型为 TimeStampType)的列创建年份和月份列。数据源是cassandra。我正在使用 Sparkshell 来执行这些步骤,这是我编写的代码 -
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.types._
var logsDF = spark.read.cassandraFormat("tableName", "cw").load()
var newlogs = logsDF.withColumn("year", year(col("logtimestamp")))
.withColumn("month", month(col("logtimestamp")))
newlogs.write.cassandraFormat("tableName_v2", "cw")
.mode("Append").save()
Run Code Online (Sandbox Code Playgroud)
但这些步骤没有成功,我最终出现以下错误
java.lang.ArithmeticException: long overflow
at java.lang.Math.multiplyExact(Math.java:892)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:205)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:166)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:327)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:325)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:252)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:242)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:426)
at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:34)
at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:21)
at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.$anonfun$getIterator$2(CassandraScanPartitionReaderFactory.scala:110)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.next(CassandraScanPartitionReaderFactory.scala:66)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at …Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个 Spring Boot (2.7.5) 项目,该项目写入 Cassandra (使用 Spring Data cassandra)并使用 Spark 从 Cassandra 读取。当我将 Spark Master 作为 local[*] 提供时,它工作正常,但当我尝试连接到 Spark Cluster Master URL 时,出现如下错误。
卡桑德拉版本 - 4.0.1
火花版本 - 3.1.3
2022-11-17 11:45:03 - Code generated in 2093.165617 ms
2022-11-17 11:45:03 - Starting job: show at HomeController.java:26
2022-11-17 11:45:03 - Got job 0 (show at HomeController.java:26) with 1 output partitions
2022-11-17 11:45:03 - Final stage: ResultStage 0 (show at HomeController.java:26)
2022-11-17 11:45:03 - Parents of final stage: List()
2022-11-17 11:45:03 - …Run Code Online (Sandbox Code Playgroud) cassandra apache-spark spring-data-cassandra spark-cassandra-connector
我是Spark和Cassandra的新手.在尝试提交spark作业时,我在连接到Cassandra时遇到错误.
细节:
版本:
Spark : 1.3.1 (build for hadoop 2.6 or later : spark-1.3.1-bin-hadoop2.6)
Cassandra : 2.0
Spark-Cassandra-Connector: 1.3.0-M1
scala : 2.10.5
Run Code Online (Sandbox Code Playgroud)
Spark和Cassandra在虚拟集群上集群详细信息:
Spark Master : 192.168.101.13
Spark Slaves : 192.168.101.11 and 192.168.101.12
Cassandra Nodes: 192.168.101.11 (seed node) and 192.168.101.12
Run Code Online (Sandbox Code Playgroud)
我试图通过我的客户机(笔记本电脑) - 172.16.0.6提交工作.在使用Google搜索此错误后,我确保可以从客户端计算机ping群集中的所有计算机:spark master/slaves和cassandra节点,并在所有计算机上禁用防火墙.但我仍然在努力解决这个错误.
Cassandra.yaml
listen_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
start_native_transport: true
native_transport_port: 9042
start_rpc: true
rpc_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
rpc_port: 9160
Run Code Online (Sandbox Code Playgroud)
我正在尝试运行最小的样本作业
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector._
val rdd …Run Code Online (Sandbox Code Playgroud) database-connection cassandra apache-spark spark-cassandra-connector
我是火花和卡桑德拉的新手.我正在尝试使用spark-cassandra连接器插入到cassandra表中,如下所示:
import java.util.UUID
import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._
case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)
object SparkConnectorContext {
val conf = new SparkConf(true).setMaster("local")
.set("spark.cassandra.connection.host", "192.168.xxx.xxx")
val sc = new SparkContext(conf)
}
object TestRepo {
def insertList(list: List[TestEntity]) = {
SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
}
}
object TestApp extends App {
val start = System.currentTimeMillis()
TestRepo.insertList(Utility.generateRandomData())
val end = System.currentTimeMillis()
val timeDiff = end-start
println("Difference (in millis)= "+timeDiff)
}
Run Code Online (Sandbox Code Playgroud)
当我使用上面的方法(带有100个实体的列表)插入时,它需要300-1100 milliseconds.我尝试使用幻像库插入相同的数据.它只需要少于20-40 milliseconds.
任何人都可以告诉我为什么火花连接器花费这么多时间插入?我在代码中做错了什么或者不建议使用spark-cassandra连接器进行插入操作?
scala cassandra apache-spark phantom-dsl spark-cassandra-connector
我使用卡桑德拉3.0.3,1.6.0星火并试图通过组合从旧文档代码运行http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java和https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md中的新内容.
这是我的pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>muhrafifm</groupId>
<artifactId>spark-cass-twitterdw</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.6.0-M1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.6.0-M1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
我所做的更改基本上是方法javaFunction,这是我根据新文档更改javaFunction后的方法之一.我也包括在内import …