如何有效地从Cassandra读取数百万行?

Thi*_*ira 5 cassandra apache-spark akka-stream spark-streaming phantom-dsl

我有一项艰巨的任务,就是从Cassandra表中读取数百万行.实际上这个表包含40~50百万行.数据实际上是我们系统的内部URL,我们需要解雇所有这些URL.为了解雇它,我们正在使用Akka Streams并且它一直工作得非常好,根据需要做一些背压.但我们仍然没有找到有效阅读所有内容的方法.

到目前为止我们尝试了什么:

  • 使用Akka Stream将数据读取为Stream.我们正在使用为特定表提供发布者的phantom-dsl.但它不会读取所有内容,只会读取一小部分内容.实际上它在第一百万之后停止阅读.

  • 在特定日期使用Spark阅读.我们的表格被建模为时间序列表,包括年,月,日,分......列.现在我们正在选择白天,所以Spark不会获取很多要处理的东西,但是选择那些日子是很痛苦的.

代码如下:

val cassandraRdd =
      sc
        .cassandraTable("keyspace", "my_table")
        .select("id", "url")
        .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)
Run Code Online (Sandbox Code Playgroud)

不幸的是我不能遍历分区以获得更少的数据,我必须使用一个收集因为它抱怨该actor不可序列化.

val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async

val source =
  Source
    .actorRef[CassandraRow](10000000, OverflowStrategy.fail)
    .map(row => makeUrl(row.getString("id"), row.getString("url")))
    .map(url => HttpRequest(uri = url) -> url)

val ref = Flow[(HttpRequest, String)]
  .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
  .to(Sink.actorRef(httpHandlerActor, IsDone))
  .runWith(source)

cassandraRdd.collect().foreach { row =>
  ref ! row
}
Run Code Online (Sandbox Code Playgroud)

我想知道你们是否有过阅读数百万行的经验,以便做出与聚合等不同的事情.

此外,我曾想过阅读所有内容并发送到Kafka主题,我将使用Streaming(spark或Akka)接收,但问题是相同的,如何有效地加载所有这些数据?

编辑

现在,我正在一个具有合理内存量100GB的集群上运行并进行收集和迭代.

此外,这与使用spark获取bigdata并使用reduceByKey,aggregateByKey等等来分析它有很大的不同.

我需要通过HTTP获取和发送所有内容= /

到目前为止,它的工作方式与我一样,但我担心这些数据变得越来越大,以至于将所有东西都记入内存是没有意义的.

流式传输这些数据将是最好的解决方案,以块为单位,但我还没有找到一个好的方法.

最后,我正在考虑使用Spark获取所有这些数据,生成一个CSV文件并使用Akka Stream IO进行处理,这样我就可以逐步将大量内容保存在内存中,因为它需要花费数小时来处理每个百万.

Thi*_*ira 5

好吧,花了一些时间阅读,与其他人交谈并进行测试后,结果可以通过以下代码示例实现:

val sc = new SparkContext(sparkConf)

val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
  .select("key", "value")
  .as((key: String, value: String) => (key, value))
  .partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
  .cache()

cassandraRdd
  .groupByKey()
  .foreachPartition { partition =>
    partition.foreach { row =>

      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()

      val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")

      val source = Source.fromIterator { () => row._2.toIterator }
      source
        .map { str =>
          myActor ! Count
          str
        }
        .to(Sink.actorRef(myActor, Finish))
        .run()
    }
  }

sc.stop()


class MyActor(system: ActorSystem) extends Actor {

  var count = 0

  def receive = {

    case Count =>
      count = count + 1

    case Finish =>
      println(s"total: $count")
      system.shutdown()

  }
}

case object Count
case object Finish
Run Code Online (Sandbox Code Playgroud)

我正在做的是以下内容:

  • 尝试使用partitionBy和groupBy方法实现大量的分区和分区程序
  • 使用Cache来阻止Data Shuffle,使你的Spark使用高IO等跨节点移动大数据.
  • 使用它的依赖项以及foreachPartition方法中的Stream创建整个actor系统.这是一个权衡,你可以只有一个ActorSystem,但你必须在我问题中写的那么糟糕地使用.collect.无论如何在内部创建,您仍然可以在集群中分布的内部运行内容.
  • 使用Sink.actorRef完成迭代器末尾的每个actor系统,并使用要杀死的消息(Finish)

也许这段代码可以进一步改进,但到目前为止,我很高兴不再使用.collect,只在Spark内部工作.