持久Spark Streaming输出

oha*_*llc 7 hadoop apache-kafka spark-streaming

我正在收集消息应用程序中的数据,我目前正在使用Flume,它每天发送大约5000万条记录

我希望使用Kafka,使用Spark Streaming从Kafka消费并将其持久化为hadoop并使用impala进行查询

对于我尝试过的每种方法,我都遇到了问题.

方法1 - 将rdd保存为镶木地板,将外部蜂巢木地板桌指向镶木地板目录

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)
Run Code Online (Sandbox Code Playgroud)

问题是finalParquet.saveAsParquetFile输出一个巨大的数字.对于文件,从Kafka收到的Dstream输出超过200个文件,批量大小为1分钟.它输出许多文件的原因是因为计算是按照另一篇文章中的说明分配的 - 如何使saveAsTextFile NOT分割输出到多个文件中? 所提出的解决方案对我来说似乎不是最佳选择,例如,如同一个用户所述 - 如果您的数据非常少,那么只有一个输出文件是个好主意.

方法2 - 使用Hivecontext.将rdd数据直接插入到hive表中

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);
Run Code Online (Sandbox Code Playgroud)

这样可以正常工作,它可以直接插入到镶木地板表中,但由于处理时间超过了批处理间隔时间,因此批次会有计划延迟.消费者无法跟上生产的最新情况,并且要处理的批次开始排队.

它似乎写入蜂巢很慢.香港专业教育学院尝试调整批量intervla大小,运行更多的消费者实例

综上所述

考虑到多个文件存在问题以及写入hive时潜在的延迟,从Spark Streaming中保留大数据的最佳方法是什么?其他人在做什么?

这里也提出了类似的问题,但他有一个问题,目录与太多文件相关 如何让Spark Streaming写出它的输出,以便Impala可以读取它?

非常感谢您的帮助

ece*_*ena 0

在解决方案#2中,创建的文件数量可以通过每个RDD的分区数量来控制。

看这个例子:

// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")
Run Code Online (Sandbox Code Playgroud)

现在,我想您可以考虑从 Kafka 提取数据的频率,以及每个 RDD 的分区数量(默认情况下,您的 Kafka 主题的分区,您可以通过重新分区来减少)。

df.write.mode("append").saveAsTable("test")我使用的是 CDH 5.5.1 中的 Spark 1.5,并且使用您的 SQL 字符串得到相同的结果。