Apu*_*rva 5 apache-spark spark-structured-streaming
来自spark结构化流媒体文档:"此检查点位置必须是HDFS兼容文件系统中的路径,并且可以DataStreamWriter
在启动查询时设置为选项."
当然,将检查点设置为s3路径会抛出:
17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133)
at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook
Run Code Online (Sandbox Code Playgroud)
这里有几个问题:
是什么让FS HDFS"合规?" 它是一个文件系统,具有Hadoop FS规范中指定的行为.这里介绍了对象存储和FS之间的区别,关键点是"最终一致的对象存储没有附加或O(1)原子重命名不符合"
特别是对于S3
通过将所有内容保存到某个位置然后将其重命名为检查点目录来激活流式检查点.这使得检查点的时间与在S3中执行数据复制的时间成比例,即~6-10 MB/s.
流代码的当前位不适合s3
现在,做一个
如果您使用的是EMR,则可以为一致的发电机数据库支持的S3支付额外费用,从而为您提供更好的一致性.但复制时间仍然相同,因此检查点也会一样慢
这是一个众所周知的问题:https://issues.apache.org/jira/browse/SPARK-19407
应该在下一个版本中修复.您可以使用--conf spark.hadoop.fs.defaultFS=s3
解决方法将默认文件系统设置为s3 .
此问题已在https://issues.apache.org/jira/browse/SPARK-19407中修复。
然而,由于 S3 缺乏最终一致性,结构化流检查点在 S3 中效果不佳。使用 S3 进行检查点并不是一个好主意https://issues.apache.org/jira/browse/SPARK-19013。
Micheal Armburst表示这个问题不会在Spark中得到修复,解决方案是等待S3guard的实现。S3Guard 有一段时间不在。
编辑:自这篇文章发表以来有 2 个进展 a) 对 S3Guard 的支持已合并到 Spark 3.0 中。b) AWS 使 S3 立即保持一致。
归档时间: |
|
查看次数: |
2742 次 |
最近记录: |