spark-redshift需要花费大量时间才能写入redshift

Nip*_*pun 8 amazon-redshift apache-spark spark-streaming

我正在设置带有kinesis和redshift的火花流光.我每隔10秒就从kinesis读取数据,处理它并使用spark-redshift lib将其写入redshift.

问题是它只花了很多时间才能写出300行.

这就是它在控制台中显示的内容

[Stage 56:====================================================> (193 + 1) / 200]
Run Code Online (Sandbox Code Playgroud)

看着我的日志df.write.format正在这样做.

我在带有4 gb ram和2核心amazon EC2的机器上安装了火花,并使用--master local [*]模式运行.

这是我创建流的方式

kinesisStream = KinesisUtils.createStream(ssc, APPLICATION_NAME, STREAM_NAME, ENDPOINT, REGION_NAME, INITIAL_POS, CHECKPOINT_INTERVAL, awsAccessKeyId =AWSACCESSID, awsSecretKey=AWSSECRETKEY, storageLevel=STORAGE_LEVEL)    
CHECKPOINT_INTERVAL = 60
storageLevel = memory

kinesisStream.foreachRDD(writeTotable)
def WriteToTable(df, type):
    if type in REDSHIFT_PAGEVIEW_TBL:
        df = df.groupby([COL_STARTTIME, COL_ENDTIME, COL_CUSTOMERID, COL_PROJECTID, COL_FONTTYPE, COL_DOMAINNAME, COL_USERAGENT]).count()
        df = df.withColumnRenamed('count', COL_PAGEVIEWCOUNT)

        # Write back to a table

        url = ("jdbc:redshift://" + REDSHIFT_HOSTNAME + ":" + REDSHIFT_PORT + "/" +   REDSHIFT_DATABASE + "?user=" + REDSHIFT_USERNAME + "&password="+ REDSHIFT_PASSWORD)

        s3Dir = 's3n://' + AWSACCESSID + ':' + AWSSECRETKEY + '@' + BUCKET + '/' + FOLDER

        print 'Start writing to redshift'
        df.write.format("com.databricks.spark.redshift").option("url", url).option("dbtable", REDSHIFT_PAGEVIEW_TBL).option('tempdir', s3Dir).mode('Append').save()

        print 'Finished writing to redshift'
Run Code Online (Sandbox Code Playgroud)

请让我知道花这么多时间的原因

Dem*_*ots 10

我通过Spark和直接写入Redshift时遇到了类似的经历.spark-redshift将始终将数据写入S3,然后使用Redshift复制功能将数据写入目标表.这种方法是编写大量记录的最佳实践和最有效的方法.这种方法在写入时也会产生大量开销,特别是当每次写入的记录数量相对较小时.

查看上面的输出,看起来你有大量的分区(可能大约200个).这可能是因为spark.sql.shuffle.partitions默认情况下设置设置为200.您可以在Spark文档中找到更多详细信息.

组操作可能生成200个分区.这意味着您正在对S3执行200次单独的复制操作,每次操作都会在获取连接和完成写入时具有实质性的相关延迟.

正如我们在下面的评论和聊天中所讨论的那样,您可以将组的结果合并到更少的分区中,从而对上面的行进行以下更改:

df = df.coalesce(4).withColumnRenamed('count', COL_PAGEVIEWCOUNT)
Run Code Online (Sandbox Code Playgroud)

这样可以将分区数从200减少到4,将副本的开销减少几个数量级.您可以尝试分区数量以优化性能.您还可以更改spark.sql.shuffle.partitions设置,以根据您正在处理的数据大小和可用核心数减少分区数.