Dee*_*Two 7 apache-spark spark-structured-streaming spark-checkpoint
我正在测试环境中运行Spark结构化流。有时会由于找不到某个检查点文件而导致作业失败。
原因之一可能是kafka主题的保留时间很短。但是我已经添加.option("failOnDataLoss", "false")到SparkSession中。
我对火花检查点有一些基本的了解。我想如果删除检查点目录,应该将其恢复。但是正如我测试的那样,一旦发生此错误,删除目录就无济于事。我需要使用其他检查点目录进行修复。
删除检查点目录的任何原因都不起作用?还是有一种方法/选项可以帮助避免此错误?
diagnostics: User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 14, master-testspark.runspark.com, executor 2): java.lang.IllegalStateException: Error reading delta file consumer-cp3/state/0/4/1.delta of HDFSStateStoreProvider[id = (op=0, part=4), dir = consumer-cp3/state/0/4]: consumer-cp3/state/0/4/1.delta does not exist
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:265)
at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:200)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1225)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1538)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:332)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:327)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:340)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:786)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:407)
... 27 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2025)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1996)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1909)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
at org.apache.hadoop.ipc.Client.call(Client.java:1498)
at org.apache.hadoop.ipc.Client.call(Client.java:1398)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:272)
at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1238)
... 39 more
Run Code Online (Sandbox Code Playgroud)
Ind*_*ain -4
CheckpointingRDD lineage graph是一个截断并将其保存到可靠的分布式( )或本地文件系统的过程HDFS。
来自Apache Spark 文档两种类型的检查点。
元数据检查点
将定义流计算的信息保存到fault-tolerant存储中,例如HDFS. 这用于从运行streaming application.
数据检查点
这对于stateful transformations跨多个批次合并数据的某些情况是必要的。在这样的转换中,生成的RDDs依赖于RDDs之前的批次,这导致依赖链的长度随着时间不断增加。为了避免恢复时间的无限制增加(与依赖链成正比),中间RDDs会stateful transformations定期checkpointed到可靠存储(例如HDFS)来切断依赖链。
如果您想检查它的内部工作原理,请查看Tathagata的 Spark 博客 使用检查点的容错
Spark 使用 来Checkpoint从故障中恢复。不要重新创建已删除的目录。这就是您收到此异常的原因。
java.io.FileNotFoundException: File does not exist: /user/spark/consumer-cp3/state/0/4/1.delta
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1381 次 |
| 最近记录: |