tay*_*ssy 5 scala hdfs apache-spark
我目前正在处理 Spark 结构化流作业,似乎在每个批处理间隔中我都会收到警告:
WARN HDFSBackedStateStoreProvider: The state for version N doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
- 每批次 N 都会递增。
我在本地模式(禁用检查点)和在 YARN (EMR) 上运行时看到日志中充斥着这种情况。
问题是:可以安全地忽略这一点吗?打开调试日志记录 HDFSBackedStateStoreProvider 表明需要花时间读取快照和增量文件,因此我有些担心。
这是我看似最小的 SparkConf
val sparkConf: SparkConf = {
val conf = new SparkConf()
.setAppName("Structured Streaming")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set("spark.speculation", "false")
if (App.isLocal)
conf
.set("spark.cassandra.output.consistency.level", "LOCAL_ONE")
.setMaster("local[*]")
else
conf
.set("spark.cassandra.connection.host", PropertyLoader.getProperty("cassandra.contactPoints"))
.set("spark.cassandra.connection.local_dc", PropertyLoader.getProperty("cassandra.localDC"))
.set("spark.cassandra.connection.ssl.enabled", "true")
.set("spark.cassandra.connection.ssl.trustStore.path", PropertyLoader.truststorePath)
.set("spark.cassandra.connection.ssl.trustStore.password", PropertyLoader.getProperty("cassandra.truststorePassword"))
.set("spark.cassandra.auth.username", PropertyLoader.getProperty("cassandra.username"))
.set("spark.cassandra.auth.password", PropertyLoader.getProperty("cassandra.password"))
.set("spark.executor.logs.rolling.maxRetainedFiles", "20")
.set("spark.executor.logs.rolling.maxSize", "524288000")
.set("spark.executor.logs.rolling.strategy", "size")
.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
.set("spark.sql.streaming.metricsEnabled", "true")
.setJars(Array[String](SparkContext.jarOfClass(getClass).get))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1172 次 |
最近记录: |