标签: checkpointing

火花流检查点恢复非常非常缓慢

  • 目标:从Kinesis读取并通过火花流将数据以Parquet格式存储到S3.
  • 情况:应用程序最初运行正常,运行1小时的批次,平均处理时间少于30分钟.出于某种原因,我们可以说应用程序崩溃了,我们尝试从检查点重新启动.处理现在需要永远,而不是前进.我们尝试以1分钟的批处理间隔测试相同的东西,处理运行良好,批次完成需要1.2分钟.当我们从检查点恢复时,每批需要大约15分钟.
  • 注意:我们使用s3作为检查点使用1个执行器,每个执行器有19g内存和3个内核

附上截图:

首次运行 - 检查点恢复之前 在检查点之前 - 流媒体页面

在检查点之前 - 工作页面

在检查点之前 - 乔布斯Page2

试图从检查点恢复: 检查点之后 -  Streaming Page 检查点后 - 工作页面

Config.scala

object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}
Run Code Online (Sandbox Code Playgroud)

S3Basin.scala

object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={ …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark amazon-kinesis spark-streaming checkpointing

18
推荐指数
1
解决办法
1500
查看次数

PyTorch 中的 .pt、.pth 和 .pwf 扩展名有什么区别?

我在一些代码示例中看到,人们使用 .pwf 作为模型文件保存格式。但在 PyTorch 文档中,推荐使用 .pt 和 .pth。我使用 .pwf 并且在小型 1->16->16 卷积网络中工作得很好。

我的问题是这些格式之间有什么区别?为什么在 PyTorch 文档中甚至不推荐 .pwf 扩展名,为什么人们仍然使用它?

python serialization deep-learning checkpointing pytorch

17
推荐指数
3
解决办法
2万
查看次数

针对DStream的Spark流检查点

在Spark Streaming中,可以(并且必须使用有状态操作)将StreamingContext检查点设置为(AND)的可靠数据存储(S3,HDFS,...):

  • 元数据
  • DStream 血统

如上所述这里,设置输出数据存储需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)

另一方面,可以DataStream通过调用checkpoint(timeInterval)它们来为每个设置谱系检查点间隔.实际上,建议将谱系检查点间隔设置为DataStream滑动间隔的5到10倍:

dstream.checkpoint(checkpointInterval).通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置.

我的问题是:

当流上下文设置为执行检查点并且没有ds.checkpoint(interval)被调用时,是否为所有数据流启用了谱系检查点,默认值checkpointInterval等于batchInterval?或者,相反,只有元数据检查点启用了什么?

apache-spark spark-streaming checkpointing

13
推荐指数
1
解决办法
8221
查看次数

Spark Streaming 1.6.0中Checkpointing/WAL的可靠性问题

描述

我们在Scala中有一个Spark Streaming 1.5.2应用程序,它从Kinesis Stream中读取JSON事件,执行一些转换/聚合并将结果写入不同的S3前缀.当前批处理间隔为60秒.我们有3000-7000事件/秒.我们正在使用检查点来保护我们免于丢失聚合.

它已经运行了一段时间,从异常甚至群集重启中恢复.我们最近重新编译了Spark Streaming 1.6.0的代码,只更改了build.sbt文件中的库依赖.在Spark 1.6.0群集中运行代码几个小时之后,我们注意到以下内容:

  1. "投入率"和"处理时间"波动率在1.6.0中大幅增加(见下面的截图).
  2. 每隔几个小时,在写记录时会抛出一个'Exception:BlockAdditionEvent ...到WriteAheadLog.java.util.concurrent.TimeoutException:[5000毫秒]之后的期货超时"异常(参见下面的完整堆栈跟踪)与特定批次(分钟)的下降到0事件/秒一致.

在做了一些挖掘之后,我认为第二个问题看起来与此Pull Request有关.PR的最初目标:"当使用S3作为WALs的目录时,写入时间太长.当多个接收器将AddBlock事件发送到ReceiverTracker时,驱动程序很容易受到瓶颈.此PR在ReceivedBlockTracker中添加事件批处理,以便接收器不会被驱动程序阻塞太长时间."

我们在Spark 1.5.2中的S3中检查点,并且没有性能/可靠性问题.我们在S3和本地NAS中测试了Spark 1.6.0中的检查点,在这两种情况下我们都收到了这个例外.看起来当检查点批次需要超过5秒时,会出现此异常,并且我们已检查该批次的事件是否永远丢失.

问题

  • Spark Streaming 1.6.0中预计"输入速率"和"处理时间"波动的增加是否有任何已知的改进方法?

  • 你知道除了这2个以外的任何解决方法吗?:

    1)保证检查点接收器写入所有文件所需的时间少于5秒.根据我的经验,即使是小批量,也无法保证使用S3.对于本地NAS,它取决于谁负责基础设施(云提供商很难).

    2)增加spark.streaming.driver.writeAheadLog.batchingTimeout属性值.

  • 您是否期望在描述的场景中丢失任何事件?我认为如果批量检查点失败,则不会增加分片/接收器序列号,并且将在稍后重试.

Spark 1.5.2统计 - 截图

在此输入图像描述

Spark 1.6.0统计 - 截图

在此输入图像描述

全栈跟踪

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
    at …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark amazon-kinesis spark-streaming checkpointing

13
推荐指数
1
解决办法
1835
查看次数

如何以python方式检查长时间运行的函数?

计算科学中的典型情况是有一个连续运行数天/数周/数月的程序。由于硬件/操作系统故障是不可避免的,因此通常使用检查点,即不时保存程序的状态。如果失败,则从最新的检查点重新启动。

实现检查点的pythonic方法是什么?

例如,可以直接转储函数的变量。

或者,我正在考虑将此类函数转换为一个类(见下文)。函数的参数将成为构造函数的参数。构成算法状态的中间数据将成为类属性。和pickle模块将帮助(缩小)序列化。

import pickle

# The file with checkpointing data
chkpt_fname = 'pickle.checkpoint'

class Factorial:
    def __init__(self, n):
        # Arguments of the algorithm
        self.n = n

        # Intermediate data (state of the algorithm)
        self.prod = 1
        self.begin = 0

    def get(self, need_restart):
        # Last time the function crashed. Need to restore the state.
        if need_restart:
            with open(chkpt_fname, 'rb') as f:
                self = pickle.load(f)

        for i in range(self.begin, self.n):
            # Some computations
            self.prod *= (i + …
Run Code Online (Sandbox Code Playgroud)

python pickle checkpointing

11
推荐指数
1
解决办法
6323
查看次数

为什么Spark从检查点恢复时抛出"SparkException:DStream尚未初始化"?

我正在从HDFS检查点恢复流(例如,ConstantInputDSTream),但我一直在努力SparkException: <X> has not been initialized.

从checkpointing恢复时,我需要做些什么吗?

我可以看到,它要DStream.zeroTime设置,但在恢复流时zeroTimenull.由于它是私人成员IDK,因此无法恢复.我可以看到StreamingContext恢复的流引用的确有一个值zeroTime.

initialize是一种私有方法,可以被调用StreamingContext.graph.start但不是被调用StreamingContext.graph.restart,大概是因为它预期zeroTime会被持久化.

有人有一个从检查点恢复并具有非空值的流的示例zeroTime吗?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming checkpointing

7
推荐指数
1
解决办法
2657
查看次数

Keras回调继续跳过保存检查点,声称缺少val_acc

我将运行一些较大的模型,并尝试中间结果。

因此,我尝试在每个时期之后使用检查点来保存最佳模型。

这是我的代码:

model = Sequential()
model.add(LSTM(700, input_shape=(X_modified.shape[1], X_modified.shape[2]), return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(700, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(700))
model.add(Dropout(0.2))
model.add(Dense(Y_modified.shape[1], activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Save the checkpoint in the /output folder
filepath = "output/text-gen-best.hdf5"

# Keep only a single checkpoint, the best over test accuracy.
checkpoint = ModelCheckpoint(filepath,
                            monitor='val_acc',
                            verbose=1,
                            save_best_only=True,
                            mode='max')
model.fit(X_modified, Y_modified, epochs=100, batch_size=50, callbacks=[checkpoint])
Run Code Online (Sandbox Code Playgroud)

但是在第一个时期之后,我仍然收到警告:

/usr/local/lib/python3.6/site-packages/keras/callbacks.py:432: RuntimeWarning: Can save best model only with val_acc available, skipping.
  'skipping.' % (self.monitor), RuntimeWarning)
Run Code Online (Sandbox Code Playgroud)

要添加metrics=['accuracy']到模型中还存在其他SO问题(例如,在使用预训练的VGG16模型时无法节省重量)的解决方案,但此处仍然存在错误。

python-3.x keras checkpointing

6
推荐指数
1
解决办法
3206
查看次数

从检查点启动火花流时堆栈溢出

当从检查点重新启动火花流时,我得到了这个例外.因为它与我生成的任何代码无关,所以我不知道是什么原因导致了这个问题.

任何的想法?

Exception in thread "streaming-start" java.lang.StackOverflowError
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
...
...
a lot a line that don't satisfy stack overflow . com
...
...
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) …
Run Code Online (Sandbox Code Playgroud)

stack-overflow apache-spark spark-streaming checkpointing

5
推荐指数
0
解决办法
344
查看次数

Flink 中的检查点事件时间水印

我们正在从一个号码接收事件。独立的数据源,因此,到达我们的 Flink 拓扑(通过 Kafka)的数据将是无序的。

我们正在 Flink 拓扑中创建 1 分钟的事件时间窗口,并在源操作符处生成事件时间水印作为(当前事件时间 - 某个阈值(30 秒))。

如果在设置的阈值之后有几个事件到达,这些事件将被简单地忽略(在我们的例子中是可以的,因为属于那一分钟的大多数事件已经到达并在相应的窗口中得到处理)。

现在,问题是,如果程序崩溃(无论出于何种原因)然后从最后一个成功的检查点再次恢复,到达事件的无序将触发过去(已处理)窗口的执行(只有极少数事件在该窗口)覆盖上一个结果。该窗口的计算。

如果 Flink 有检查点事件时间水印,这个问题就不会发生。

所以,我想知道是否有办法在 Flink 中强制执行事件时间水印的检查点......

apache-flink checkpointing flink-streaming

5
推荐指数
1
解决办法
680
查看次数

如何在pytorch模型中加载检查点文件?

在我的 pytorch 模型中,我正在像这样初始化我的模型和优化器。

model = MyModelClass(config, shape, x_tr_mean, x_tr,std)
optimizer = optim.SGD(model.parameters(), lr=config.learning_rate)
Run Code Online (Sandbox Code Playgroud)

这是我的检查点文件的路径。

checkpoint_file = os.path.join(config.save_dir, "checkpoint.pth")

为了加载这个检查点文件,我检查并查看检查点文件是否存在,然后加载它以及模型和优化器。

if os.path.exists(checkpoint_file):
    if config.resume:
        torch.load(checkpoint_file)
        model.load_state_dict(torch.load(checkpoint_file))
        optimizer.load_state_dict(torch.load(checkpoint_file))
Run Code Online (Sandbox Code Playgroud)

另外,这是我保存模型和优化器的方式。

 torch.save({'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'iter_idx': iter_idx, 'best_va_acc': best_va_acc}, checkpoint_file)
Run Code Online (Sandbox Code Playgroud)

出于某种原因,每当我运行此代码时,我都会收到一个奇怪的错误。

model.load_state_dict(torch.load(checkpoint_file))
File "/home/Josh/.local/lib/python3.6/site-packages/torch/nn/modules/module.py", line 769, in load_state_dict
self.__class__.__name__, "\n\t".join(error_msgs)))
RuntimeError: Error(s) in loading state_dict for MyModelClass:
        Missing key(s) in state_dict: "mean", "std", "attribute.weight", "attribute.bias".
        Unexpected key(s) in state_dict: "model", "optimizer", "iter_idx", "best_va_acc"
Run Code Online (Sandbox Code Playgroud)

有谁知道我为什么会收到这个错误?

python python-3.x checkpointing pytorch

5
推荐指数
1
解决办法
9299
查看次数