标签: spark-cassandra-connector

如何在Spark中分配工作

Spark版本:1.4.0 Cassandra版本:2.1.8

我使用数据交换Spark Cassandra连接器来连接Spark和Cassandra.我在Spark中有6个节点,运行着6个不同的工作者.我有2个Cassandra节点来协助这个.

我尝试了一个示例应用程序来执行列族中行数的计数(CassandraUtil.javaFunctions(sc).cassandraTable("keyspace","columnfamily").count()).

现在,当我将此单个作业分派给主服务器时,作业在Spark Cluster中的2个工作节点中运行(来自事件时间轴).

问题

  1. 我派了一份工作.为什么这是由两名工人完成的?是不是像一个工人在这里像大师一样?
  2. 我发现一个工人的反序列化时间非常长.其他工人很快完成了工作(1个用了40秒,2个用了1秒).你能对此有所了解吗?
  3. 这两名工人似乎已经与卡桑德拉建立了联系并且已经返回了结果.因此,在我看来,两者都在做同样的工作.你能对此有所了解吗?
  4. 我仍然想知道RDD的实现在哪里适合Cassandra这个分布式领域.有人可以对此有所了解吗?多个工作人员如何知道他们必须处理哪个Cassandra分区,如果它可以说,在6个工作人员之间拆分10k个分区?是这样的,抓取是由一个工人完成的,处理由其中的6个完成?即使在这种情况下,执行逻辑在所有工作者中保持不变(从Cassandra和进程中获取).Spark如何做到这一点?
  5. 想知道使用Spark和Cassandra的真正优势.它是在内存管理级别还是具有其他一些优势?

编辑

在此输入图像描述

我添加了跑步的图片.我只有10个不同的分区.这是一个简单的计数操作.

我的猜测仍然是我的问题.

如果你看到提供的附件,我想你会得到一个想法.这是为了向我的火花大师提交一份工作.想知道它是如何在两个不同的执行器中运行的.两个执行程序都返回相同的字节数.因此,这表明两者都从cassandra获取了所有10个分区.如果这是它发生的方式,那么火花对我来说是什么?或者,我是否必须以其他方式获取它,以便由两个不同的工作者提取十个分区?

cassandra cassandra-2.0 apache-spark spark-cassandra-connector

2
推荐指数
1
解决办法
1574
查看次数

如何在cassandra中执行同步丢弃并创建键空间?

我不想在键空间的任何表中有任何数据.所以我决定删除键空间(如果存在)并立即创建它.我使用下面的代码来实现相同的目标.

 CassandraConnector(conf).withSessionDo { session =>
  session.execute(s"DROP KEYSPACE if EXISTS $keyspace")
  session.execute("""CREATE KEYSPACE if NOT EXISTS %s
  WITH replication = {'class':'SimpleStrategy','replication_factor':'1'};""".format(keyspace)
Run Code Online (Sandbox Code Playgroud)

)}

但它未能创建密钥空间.从日志我只能看到一个警告说明

Received a DROPPED notification for table test.table_tracker, but this keyspace is unknown in our metadata.
Run Code Online (Sandbox Code Playgroud)

我也尝试过使用python cassandra驱动程序.但结果是一样的.我相信有一些竞争条件,并且丢弃键空间发生异步(如果我错了,请纠正我).

如何同步删除和创建键空间?

cassandra spark-cassandra-connector

2
推荐指数
1
解决办法
924
查看次数

从scala中的Array中删除方括号[]

我有带方括号的日期[2014-11-08 06:27:00.0],并且想要删除它。

预期输出为2014-11-08 06:27:00.0

val conf = new SparkConf(true)
   .set("spark.cassandra.connection.host", "127.0.0.1").setAppName("CasteDate").setMaster("local[*]")
   .set("spark.cassandra.connection.port", "9042")
   .set("spark.driver.allowMultipleContexts", "true")
   .set("spark.streaming.receiver.writeAheadLog.enable", "true")

val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(1))
val csc=new CassandraSQLContext(sc)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

var input: SimpleDateFormat   = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S")
input.setTimeZone(TimeZone.getTimeZone("GMT"))
var dia: SimpleDateFormat  = new SimpleDateFormat("dd")
var mes: SimpleDateFormat = new SimpleDateFormat("MM")
var ano: SimpleDateFormat = new SimpleDateFormat("yyyy")
var horas: SimpleDateFormat = new SimpleDateFormat("HH")
var minutos: SimpleDateFormat  = new SimpleDateFormat("mm")

val data=csc.sql("SELECT timecol …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark-sql spark-cassandra-connector

2
推荐指数
1
解决办法
8261
查看次数

用于Cassandra的Spark2会话,sql查询

在Spark-2.0中,创建Spark会话的最佳方法是什么.因为在Spark-2.0和Cassandra中,API都经过了重新设计,基本上不赞成使用SqlContext(以及CassandraSqlContext).因此,为了执行SQL,我要么创建一个Cassandra会话(com.datastax.driver.core.Session) and use execute( " ").或者我必须创建一个SparkSession (org.apache.spark.sql.SparkSession) and execute sql(String sqlText)方法.

我不知道两者的SQL限制 - 有人可以解释一下.

此外,如果我必须创建SparkSession - 我该怎么做 - 找不到任何合适的例子.随着API的重新设计,旧示例不起作用.我正在通过这个代码示例 - DataFrames - 不清楚这里使用的是什么sql上下文(是正确的方法继续.)(由于某些原因,弃用的API甚至没有编译 - 需要检查我的eclipse设置)

谢谢

java apache-spark apache-spark-sql spark-cassandra-connector apache-spark-2.0

2
推荐指数
1
解决办法
4434
查看次数

如何指定多个 Spark Standalone masters(对于 spark.master 属性)?

我有 1 个主节点,其中 3 个工作节点与主节点通信。

作为灾难​​恢复,我们创建了 2 个 Master,让 Zookeeper 选举 Master。我正在使用 datastax 的 spark Cassandra 连接器。有没有办法传递多个 Spark Master URL 以连续尝试成功。

new SparkConf(true)
        .set("spark.cassandra.connection.host", "10.3.2.1")  
        .set("spark.cassandra.auth.username","cassandra")
        .set("spark.cassandra.auth.password",cassandra"))
        .set("spark.master", "spark://1.1.2.2:7077") // Can I give multiple Urls here?
        .set("spark.app.name","Sample App");
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-cassandra-connector

2
推荐指数
1
解决办法
1271
查看次数

为什么即使使用 DataFrame API 按分区键查询表,Spark Cassandra 连接器也允许过滤?

给定 Cassandra 表:

CREATE TABLE data_storage.stack_overflow_test_table (
    id int,
    text_id text,
    clustering date,
    some_other text,
    PRIMARY KEY (( id, text_id ), clustering)
)
Run Code Online (Sandbox Code Playgroud)

以下查询是有效查询:

select * from data_storage.test_table_filtering where id=4 and text_id='2';
Run Code Online (Sandbox Code Playgroud)

因为我包含了从分区键到查询的所有列。

考虑以下代码:

val ds = session.
  read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "stack_overflow_test_table", "keyspace" -> "data_storage"))
  .load()
  .where(col("id") === 4 &&
  col("text_id") === "2").show(10)
Run Code Online (Sandbox Code Playgroud)

由于 spark-cassandra 连接器将谓词推送到 Cassandra,我希望 Spark 将发送 Cassandra 的查询类似于

SELECT "id", "text_id", "clustering", "some_other" FROM "data_storage"."stack_overflow_test_table" WHERE "id" = ? AND "text_id" = ? 
Run Code Online (Sandbox Code Playgroud)

但是,我可以在日志中看到

18/04/09 15:38:09 …

cassandra apache-spark spark-cassandra-connector spark-dataframe

2
推荐指数
1
解决办法
2671
查看次数

为什么在 Spark shell 中使用自定义 case 类会导致序列化错误?

对于我的生活,我无法理解为什么这不可序列化。我在 spark-shell(粘贴模式)下运行。我在 Spark 1.3.1、Cassandra 2.1.6、Scala 2.10 上运行

import org.apache.spark._
import com.datastax.spark.connector._

val driverPort = 7077
val driverHost = "localhost"
val conf = new SparkConf(true)
  .set("spark.driver.port", driverPort.toString)
  .set("spark.driver.host", driverHost)
  .set("spark.logConf", "true")
  .set("spark.driver.allowMultipleContexts", "true")
  .set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext("local[*]", "test", conf)
case class Test(id: String, creationdate: String) extends Serializable

sc.parallelize(Seq(Test("98429740-2933-11e5-8e68-f7cca436f8bf", "2015-07-13T07:48:47.924Z")))
  .saveToCassandra("testks", "test", SomeColumns("id", "creationdate"))

sc.cassandraTable[Test]("testks", "test").toArray
sc.stop()
Run Code Online (Sandbox Code Playgroud)

我用这个开始了 spark-shell:

./spark-shell -Ddriver-class-path=/usr/local/spark/libs/* -Dsun.io.serialization.extendedDebugInfo=true
Run Code Online (Sandbox Code Playgroud)

没有看到包含 -Dsun.io.serialization.extendedDebugInfo=true 属性的任何区别。

完整错误(已编辑):

java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)
        at …
Run Code Online (Sandbox Code Playgroud)

scala cassandra apache-spark spark-cassandra-connector

1
推荐指数
1
解决办法
1229
查看次数

带有组装jar的spark-cassandra-connector的NoSuchMethodError

我是Scala的新手,我正在尝试建立一个Spark工作.我已经构建了一个包含DataStax连接器的ajob并将其组装成一个胖罐.当我尝试执行它时失败了java.lang.NoSuchMethodError.我已经破解了JAR并且可以看到包含DataStax库.我错过了一些明显的东西吗 有关这个过程的好教程吗?

谢谢

console $ spark-submit --class org.bobbrez.CasCountJob ./target/scala-2.11/bobbrez-spark-assembly-0.0.1.jar ks tn ...线程"main"中的异常java.lang.NoSuchMethodError:scala .runtime.ObjectRef.zero()Lscala /运行/ ObjectRef; at com.datastax.spark.connector.cql.CassandraConnector $ .com $ datastax $ spark $ connector $ cql $ CassandraConnector $$ createSession(CassandraConnector.scala)at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ 2.申请(CassandraConnector.scala:148)......

build.sbt

name := "soofa-spark"

version := "0.0.1"

scalaVersion := "2.11.7"

// additional libraries
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" %     "provided"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M3"
libraryDependencies += "com.typesafe" % "config" % "1.3.0"

mergeStrategy in assembly <<= (mergeStrategy in …
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark spark-cassandra-connector

1
推荐指数
1
解决办法
817
查看次数

如何在spark 2.0中使用Cassandra Context

在之前的Spark版本1.6.1中,我正在使用spark Context创建Cassandra Context,

import org.apache.spark.{ Logging, SparkContext, SparkConf }
//config
val conf: org.apache.spark.SparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", CassandraHost)
.setAppName(getClass.getSimpleName)
 lazy val sc = new SparkContext(conf)
 val cassandraSqlCtx: org.apache.spark.sql.cassandra.CassandraSQLContext = new CassandraSQLContext(sc)
//Query using Cassandra context
  cassandraSqlCtx.sql("select id from table ")
Run Code Online (Sandbox Code Playgroud)

但在Spark 2.0中,Spark Context被Spark会话取代,我如何使用cassandra上下文?

cassandra apache-spark apache-spark-sql spark-cassandra-connector

1
推荐指数
1
解决办法
2430
查看次数

无法连接到远程 Spark Master - ROR TransportRequestHandler:为单向消息调用 RpcHandler#receive() 时出错

我前段时间发布了这个问题,但结果发现我使用的是本地资源而不是远程资源。

我有一台配置了spark : 2.1.1cassandra : 3.0.9和的远程计算机scala : 2.11.6

Cassandra 的配置位置为localhost:9032,spark master 的配置位置为localhost:7077

Spark master 设置为127.0.0.1,其端口设置为7077

我可以远程连接到 cassandra,但无法使用 Spark 执行相同的操作。

连接到远程 Spark Master 时,出现以下错误:

错误 TransportRequestHandler:为单向消息调用 RpcHandler#receive() 时出错。

这是我通过代码进行的设置

val configuration = new SparkConf(true)
                        .setAppName("myApp")
                        .setMaster("spark://xx.xxx.xxx.xxx:7077")
                        .set("spark.cassandra.connection.host", "xx.xxx.xxx.xxx")
                        .set("spark.cassandra.connection.port", 9042)
                        .set("spark.cassandra.input.consistency.level","ONE")
                        .set("spark.driver.allowMultipleContexts", "true")
val sparkSession = SparkSession
                   .builder()
                   .appName("myAppEx")
                   .config(configuration)
                   .enableHiveSupport()
                   .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

我不明白为什么 cassandra 工作得很好而 Spark 却不行。

这是什么原因造成的?我该如何解决?

scala cassandra apache-spark spark-cassandra-connector

1
推荐指数
1
解决办法
2万
查看次数