Nip*_*pun 8 amazon-redshift apache-spark spark-streaming
我正在设置带有kinesis和redshift的火花流光.我每隔10秒就从kinesis读取数据,处理它并使用spark-redshift lib将其写入redshift.
问题是它只花了很多时间才能写出300行.
这就是它在控制台中显示的内容
[Stage 56:====================================================> (193 + 1) / 200]
Run Code Online (Sandbox Code Playgroud)
看着我的日志df.write.format正在这样做.
我在带有4 gb ram和2核心amazon EC2的机器上安装了火花,并使用--master local [*]模式运行.
这是我创建流的方式
kinesisStream = KinesisUtils.createStream(ssc, APPLICATION_NAME, STREAM_NAME, ENDPOINT, REGION_NAME, INITIAL_POS, CHECKPOINT_INTERVAL, awsAccessKeyId =AWSACCESSID, awsSecretKey=AWSSECRETKEY, storageLevel=STORAGE_LEVEL)
CHECKPOINT_INTERVAL = 60
storageLevel = memory
kinesisStream.foreachRDD(writeTotable)
def WriteToTable(df, type):
if type in REDSHIFT_PAGEVIEW_TBL:
df = df.groupby([COL_STARTTIME, COL_ENDTIME, COL_CUSTOMERID, COL_PROJECTID, COL_FONTTYPE, COL_DOMAINNAME, COL_USERAGENT]).count()
df = df.withColumnRenamed('count', COL_PAGEVIEWCOUNT)
# Write back to a table
url = ("jdbc:redshift://" + REDSHIFT_HOSTNAME + ":" + REDSHIFT_PORT + "/" + REDSHIFT_DATABASE + "?user=" + REDSHIFT_USERNAME + "&password="+ REDSHIFT_PASSWORD)
s3Dir = 's3n://' + AWSACCESSID + ':' + AWSSECRETKEY + '@' + BUCKET + '/' + FOLDER
print 'Start writing to redshift'
df.write.format("com.databricks.spark.redshift").option("url", url).option("dbtable", REDSHIFT_PAGEVIEW_TBL).option('tempdir', s3Dir).mode('Append').save()
print 'Finished writing to redshift'
Run Code Online (Sandbox Code Playgroud)
请让我知道花这么多时间的原因
Dem*_*ots 10
我通过Spark和直接写入Redshift时遇到了类似的经历.spark-redshift将始终将数据写入S3,然后使用Redshift复制功能将数据写入目标表.这种方法是编写大量记录的最佳实践和最有效的方法.这种方法在写入时也会产生大量开销,特别是当每次写入的记录数量相对较小时.
查看上面的输出,看起来你有大量的分区(可能大约200个).这可能是因为spark.sql.shuffle.partitions默认情况下设置设置为200.您可以在Spark文档中找到更多详细信息.
组操作可能生成200个分区.这意味着您正在对S3执行200次单独的复制操作,每次操作都会在获取连接和完成写入时具有实质性的相关延迟.
正如我们在下面的评论和聊天中所讨论的那样,您可以将组的结果合并到更少的分区中,从而对上面的行进行以下更改:
df = df.coalesce(4).withColumnRenamed('count', COL_PAGEVIEWCOUNT)
Run Code Online (Sandbox Code Playgroud)
这样可以将分区数从200减少到4,将副本的开销减少几个数量级.您可以尝试分区数量以优化性能.您还可以更改spark.sql.shuffle.partitions设置,以根据您正在处理的数据大小和可用核心数减少分区数.
| 归档时间: |
|
| 查看次数: |
1918 次 |
| 最近记录: |