Cassandra使用spark-cassandra连接器插入性能

Yad*_*nan 4 scala cassandra apache-spark phantom-dsl spark-cassandra-connector

我是火花和卡桑德拉的新手.我正在尝试使用spark-cassandra连接器插入到cassandra表中,如下所示:

import java.util.UUID

import org.apache.spark.{SparkContext, SparkConf}
import org.joda.time.DateTime
import com.datastax.spark.connector._

case class TestEntity(id:UUID, category:String, name:String,value:Double, createDate:DateTime, tag:Long)

object SparkConnectorContext {
  val conf = new SparkConf(true).setMaster("local")
    .set("spark.cassandra.connection.host", "192.168.xxx.xxx")
  val sc = new SparkContext(conf)
}
object TestRepo {
  def insertList(list: List[TestEntity]) = {
    SparkConnectorContext.sc.parallelize(list).saveToCassandra("testKeySpace", "testColumnFamily")
  }
}
object TestApp extends App {
  val start = System.currentTimeMillis()
  TestRepo.insertList(Utility.generateRandomData())
  val end = System.currentTimeMillis()
  val timeDiff = end-start
  println("Difference (in millis)= "+timeDiff)
}
Run Code Online (Sandbox Code Playgroud)

当我使用上面的方法(带有100个实体的列表)插入时,它需要300-1100 milliseconds.我尝试使用幻像库插入相同的数据.它只需要少于20-40 milliseconds.

任何人都可以告诉我为什么火花连接器花费这么多时间插入?我在代码中做错了什么或者不建议使用spark-cassandra连接器进行插入操作?

Jim*_*yer 5

看起来你在你的计时中包含并行化操作.此外,由于您的spark工作程序在与Cassandra不同的计算机上运行,​​因此saveToCassandra操作将通过网络进行写入.

尝试配置系统以在Cassandra节点上运行spark worker.然后在单独的步骤中创建RDD并在其上调用count()之类的操作以将数据加载到内存中.此外,您可能希望持久化()或缓存()RDD以确保它保留在内存中以进行测试.

然后只计算缓存的RDD的saveToCassandra.

您可能还想查看Cassandra连接器提供的repartitionByCassandraReplica方法.这将根据写入需要去的Cassandra节点对RDD中的数据进行分区.通过这种方式,您可以利用数据局部性,并且通常避免在网络上进行写入和随机播放.