Spark版本:1.4.0 Cassandra版本:2.1.8
我使用数据交换Spark Cassandra连接器来连接Spark和Cassandra.我在Spark中有6个节点,运行着6个不同的工作者.我有2个Cassandra节点来协助这个.
我尝试了一个示例应用程序来执行列族中行数的计数(CassandraUtil.javaFunctions(sc).cassandraTable("keyspace","columnfamily").count()).
现在,当我将此单个作业分派给主服务器时,作业在Spark Cluster中的2个工作节点中运行(来自事件时间轴).
问题
编辑
我添加了跑步的图片.我只有10个不同的分区.这是一个简单的计数操作.
我的猜测仍然是我的问题.
如果你看到提供的附件,我想你会得到一个想法.这是为了向我的火花大师提交一份工作.想知道它是如何在两个不同的执行器中运行的.两个执行程序都返回相同的字节数.因此,这表明两者都从cassandra获取了所有10个分区.如果这是它发生的方式,那么火花对我来说是什么?或者,我是否必须以其他方式获取它,以便由两个不同的工作者提取十个分区?
cassandra cassandra-2.0 apache-spark spark-cassandra-connector
我不想在键空间的任何表中有任何数据.所以我决定删除键空间(如果存在)并立即创建它.我使用下面的代码来实现相同的目标.
CassandraConnector(conf).withSessionDo { session =>
session.execute(s"DROP KEYSPACE if EXISTS $keyspace")
session.execute("""CREATE KEYSPACE if NOT EXISTS %s
WITH replication = {'class':'SimpleStrategy','replication_factor':'1'};""".format(keyspace)
Run Code Online (Sandbox Code Playgroud)
)}
但它未能创建密钥空间.从日志我只能看到一个警告说明
Received a DROPPED notification for table test.table_tracker, but this keyspace is unknown in our metadata.
Run Code Online (Sandbox Code Playgroud)
我也尝试过使用python cassandra驱动程序.但结果是一样的.我相信有一些竞争条件,并且丢弃键空间发生异步(如果我错了,请纠正我).
如何同步删除和创建键空间?
我有带方括号的日期[2014-11-08 06:27:00.0],并且想要删除它。
预期输出为2014-11-08 06:27:00.0
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1").setAppName("CasteDate").setMaster("local[*]")
.set("spark.cassandra.connection.port", "9042")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
val csc=new CassandraSQLContext(sc)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
var input: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S")
input.setTimeZone(TimeZone.getTimeZone("GMT"))
var dia: SimpleDateFormat = new SimpleDateFormat("dd")
var mes: SimpleDateFormat = new SimpleDateFormat("MM")
var ano: SimpleDateFormat = new SimpleDateFormat("yyyy")
var horas: SimpleDateFormat = new SimpleDateFormat("HH")
var minutos: SimpleDateFormat = new SimpleDateFormat("mm")
val data=csc.sql("SELECT timecol …Run Code Online (Sandbox Code Playgroud) 在Spark-2.0中,创建Spark会话的最佳方法是什么.因为在Spark-2.0和Cassandra中,API都经过了重新设计,基本上不赞成使用SqlContext(以及CassandraSqlContext).因此,为了执行SQL,我要么创建一个Cassandra会话(com.datastax.driver.core.Session) and use execute( " ").或者我必须创建一个SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)方法.
我不知道两者的SQL限制 - 有人可以解释一下.
此外,如果我必须创建SparkSession - 我该怎么做 - 找不到任何合适的例子.随着API的重新设计,旧示例不起作用.我正在通过这个代码示例 - DataFrames - 不清楚这里使用的是什么sql上下文(是正确的方法继续.)(由于某些原因,弃用的API甚至没有编译 - 需要检查我的eclipse设置)
谢谢
java apache-spark apache-spark-sql spark-cassandra-connector apache-spark-2.0
我有 1 个主节点,其中 3 个工作节点与主节点通信。
作为灾难恢复,我们创建了 2 个 Master,让 Zookeeper 选举 Master。我正在使用 datastax 的 spark Cassandra 连接器。有没有办法传递多个 Spark Master URL 以连续尝试成功。
new SparkConf(true)
.set("spark.cassandra.connection.host", "10.3.2.1")
.set("spark.cassandra.auth.username","cassandra")
.set("spark.cassandra.auth.password",cassandra"))
.set("spark.master", "spark://1.1.2.2:7077") // Can I give multiple Urls here?
.set("spark.app.name","Sample App");
Run Code Online (Sandbox Code Playgroud) 给定 Cassandra 表:
CREATE TABLE data_storage.stack_overflow_test_table (
id int,
text_id text,
clustering date,
some_other text,
PRIMARY KEY (( id, text_id ), clustering)
)
Run Code Online (Sandbox Code Playgroud)
以下查询是有效查询:
select * from data_storage.test_table_filtering where id=4 and text_id='2';
Run Code Online (Sandbox Code Playgroud)
因为我包含了从分区键到查询的所有列。
考虑以下代码:
val ds = session.
read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "stack_overflow_test_table", "keyspace" -> "data_storage"))
.load()
.where(col("id") === 4 &&
col("text_id") === "2").show(10)
Run Code Online (Sandbox Code Playgroud)
由于 spark-cassandra 连接器将谓词推送到 Cassandra,我希望 Spark 将发送 Cassandra 的查询类似于
SELECT "id", "text_id", "clustering", "some_other" FROM "data_storage"."stack_overflow_test_table" WHERE "id" = ? AND "text_id" = ?
Run Code Online (Sandbox Code Playgroud)
但是,我可以在日志中看到
18/04/09 15:38:09 …
cassandra apache-spark spark-cassandra-connector spark-dataframe
对于我的生活,我无法理解为什么这不可序列化。我在 spark-shell(粘贴模式)下运行。我在 Spark 1.3.1、Cassandra 2.1.6、Scala 2.10 上运行
import org.apache.spark._
import com.datastax.spark.connector._
val driverPort = 7077
val driverHost = "localhost"
val conf = new SparkConf(true)
.set("spark.driver.port", driverPort.toString)
.set("spark.driver.host", driverHost)
.set("spark.logConf", "true")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext("local[*]", "test", conf)
case class Test(id: String, creationdate: String) extends Serializable
sc.parallelize(Seq(Test("98429740-2933-11e5-8e68-f7cca436f8bf", "2015-07-13T07:48:47.924Z")))
.saveToCassandra("testks", "test", SomeColumns("id", "creationdate"))
sc.cassandraTable[Test]("testks", "test").toArray
sc.stop()
Run Code Online (Sandbox Code Playgroud)
我用这个开始了 spark-shell:
./spark-shell -Ddriver-class-path=/usr/local/spark/libs/* -Dsun.io.serialization.extendedDebugInfo=true
Run Code Online (Sandbox Code Playgroud)
没有看到包含 -Dsun.io.serialization.extendedDebugInfo=true 属性的任何区别。
完整错误(已编辑):
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
at …Run Code Online (Sandbox Code Playgroud) 我是Scala的新手,我正在尝试建立一个Spark工作.我已经构建了一个包含DataStax连接器的ajob并将其组装成一个胖罐.当我尝试执行它时失败了java.lang.NoSuchMethodError.我已经破解了JAR并且可以看到包含DataStax库.我错过了一些明显的东西吗 有关这个过程的好教程吗?
谢谢
console $ spark-submit --class org.bobbrez.CasCountJob ./target/scala-2.11/bobbrez-spark-assembly-0.0.1.jar ks tn ...线程"main"中的异常java.lang.NoSuchMethodError:scala .runtime.ObjectRef.zero()Lscala /运行/ ObjectRef; at com.datastax.spark.connector.cql.CassandraConnector $ .com $ datastax $ spark $ connector $ cql $ CassandraConnector $$ createSession(CassandraConnector.scala)at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ 2.申请(CassandraConnector.scala:148)......
build.sbt
name := "soofa-spark"
version := "0.0.1"
scalaVersion := "2.11.7"
// additional libraries
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M3"
libraryDependencies += "com.typesafe" % "config" % "1.3.0"
mergeStrategy in assembly <<= (mergeStrategy in …Run Code Online (Sandbox Code Playgroud) 在之前的Spark版本1.6.1中,我正在使用spark Context创建Cassandra Context,
import org.apache.spark.{ Logging, SparkContext, SparkConf }
//config
val conf: org.apache.spark.SparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", CassandraHost)
.setAppName(getClass.getSimpleName)
lazy val sc = new SparkContext(conf)
val cassandraSqlCtx: org.apache.spark.sql.cassandra.CassandraSQLContext = new CassandraSQLContext(sc)
//Query using Cassandra context
cassandraSqlCtx.sql("select id from table ")
Run Code Online (Sandbox Code Playgroud)
但在Spark 2.0中,Spark Context被Spark会话取代,我如何使用cassandra上下文?
cassandra apache-spark apache-spark-sql spark-cassandra-connector
我前段时间发布了这个问题,但结果发现我使用的是本地资源而不是远程资源。
我有一台配置了spark : 2.1.1、cassandra : 3.0.9和的远程计算机scala : 2.11.6。
Cassandra 的配置位置为localhost:9032,spark master 的配置位置为localhost:7077。
Spark master 设置为127.0.0.1,其端口设置为7077。
我可以远程连接到 cassandra,但无法使用 Spark 执行相同的操作。
连接到远程 Spark Master 时,出现以下错误:
错误 TransportRequestHandler:为单向消息调用 RpcHandler#receive() 时出错。
这是我通过代码进行的设置
val configuration = new SparkConf(true)
.setAppName("myApp")
.setMaster("spark://xx.xxx.xxx.xxx:7077")
.set("spark.cassandra.connection.host", "xx.xxx.xxx.xxx")
.set("spark.cassandra.connection.port", 9042)
.set("spark.cassandra.input.consistency.level","ONE")
.set("spark.driver.allowMultipleContexts", "true")
val sparkSession = SparkSession
.builder()
.appName("myAppEx")
.config(configuration)
.enableHiveSupport()
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
我不明白为什么 cassandra 工作得很好而 Spark 却不行。
这是什么原因造成的?我该如何解决?