附上截图:
Config.scala
object Config {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)
val checkpointDirectory = sc.hadoopConfiguration.get("checkpointDirectory")
// sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))
val numStreams = 2
def getSparkContext(): SparkContext = {
this.sc
}
def getSqlContext(): HiveContext = {
this.sqlContext
}
}
Run Code Online (Sandbox Code Playgroud)
S3Basin.scala
object S3Basin {
def main(args: Array[String]): Unit = {
Kinesis.startStreaming(s3basinFunction _)
}
def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={ …
Run Code Online (Sandbox Code Playgroud) amazon-s3 apache-spark amazon-kinesis spark-streaming checkpointing
我在一些代码示例中看到,人们使用 .pwf 作为模型文件保存格式。但在 PyTorch 文档中,推荐使用 .pt 和 .pth。我使用 .pwf 并且在小型 1->16->16 卷积网络中工作得很好。
我的问题是这些格式之间有什么区别?为什么在 PyTorch 文档中甚至不推荐 .pwf 扩展名,为什么人们仍然使用它?
在Spark Streaming中,可以(并且必须使用有状态操作)将StreamingContext
检查点设置为(AND)的可靠数据存储(S3,HDFS,...):
DStream
血统如上所述这里,设置输出数据存储需要调用yourSparkStreamingCtx.checkpoint(datastoreURL)
另一方面,可以DataStream
通过调用checkpoint(timeInterval)
它们来为每个设置谱系检查点间隔.实际上,建议将谱系检查点间隔设置为DataStream
滑动间隔的5到10倍:
dstream.checkpoint(checkpointInterval).通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的设置.
我的问题是:
当流上下文设置为执行检查点并且没有ds.checkpoint(interval)
被调用时,是否为所有数据流启用了谱系检查点,默认值checkpointInterval
等于batchInterval
?或者,相反,只有元数据检查点启用了什么?
我们在Scala中有一个Spark Streaming 1.5.2应用程序,它从Kinesis Stream中读取JSON事件,执行一些转换/聚合并将结果写入不同的S3前缀.当前批处理间隔为60秒.我们有3000-7000事件/秒.我们正在使用检查点来保护我们免于丢失聚合.
它已经运行了一段时间,从异常甚至群集重启中恢复.我们最近重新编译了Spark Streaming 1.6.0的代码,只更改了build.sbt文件中的库依赖项.在Spark 1.6.0群集中运行代码几个小时之后,我们注意到以下内容:
在做了一些挖掘之后,我认为第二个问题看起来与此Pull Request有关.PR的最初目标:"当使用S3作为WALs的目录时,写入时间太长.当多个接收器将AddBlock事件发送到ReceiverTracker时,驱动程序很容易受到瓶颈.此PR在ReceivedBlockTracker中添加事件批处理,以便接收器不会被驱动程序阻塞太长时间."
我们在Spark 1.5.2中的S3中检查点,并且没有性能/可靠性问题.我们在S3和本地NAS中测试了Spark 1.6.0中的检查点,在这两种情况下我们都收到了这个例外.看起来当检查点批次需要超过5秒时,会出现此异常,并且我们已检查该批次的事件是否永远丢失.
Spark Streaming 1.6.0中预计"输入速率"和"处理时间"波动的增加是否有任何已知的改进方法?
你知道除了这2个以外的任何解决方法吗?:
1)保证检查点接收器写入所有文件所需的时间少于5秒.根据我的经验,即使是小批量,也无法保证使用S3.对于本地NAS,它取决于谁负责基础设施(云提供商很难).
2)增加spark.streaming.driver.writeAheadLog.batchingTimeout属性值.
您是否期望在描述的场景中丢失任何事件?我认为如果批量检查点失败,则不会增加分片/接收器序列号,并且将在稍后重试.
16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
at …
Run Code Online (Sandbox Code Playgroud) scala apache-spark amazon-kinesis spark-streaming checkpointing
计算科学中的典型情况是有一个连续运行数天/数周/数月的程序。由于硬件/操作系统故障是不可避免的,因此通常使用检查点,即不时保存程序的状态。如果失败,则从最新的检查点重新启动。
实现检查点的pythonic方法是什么?
例如,可以直接转储函数的变量。
或者,我正在考虑将此类函数转换为一个类(见下文)。函数的参数将成为构造函数的参数。构成算法状态的中间数据将成为类属性。和pickle
模块将帮助(缩小)序列化。
import pickle
# The file with checkpointing data
chkpt_fname = 'pickle.checkpoint'
class Factorial:
def __init__(self, n):
# Arguments of the algorithm
self.n = n
# Intermediate data (state of the algorithm)
self.prod = 1
self.begin = 0
def get(self, need_restart):
# Last time the function crashed. Need to restore the state.
if need_restart:
with open(chkpt_fname, 'rb') as f:
self = pickle.load(f)
for i in range(self.begin, self.n):
# Some computations
self.prod *= (i + …
Run Code Online (Sandbox Code Playgroud) 我正在从HDFS检查点恢复流(例如,ConstantInputDSTream),但我一直在努力SparkException: <X> has not been initialized
.
从checkpointing恢复时,我需要做些什么吗?
我可以看到,它要DStream.zeroTime
设置,但在恢复流时zeroTime
是null
.由于它是私人成员IDK,因此无法恢复.我可以看到StreamingContext
恢复的流引用的确有一个值zeroTime
.
initialize
是一种私有方法,可以被调用StreamingContext.graph.start
但不是被调用StreamingContext.graph.restart
,大概是因为它预期zeroTime
会被持久化.
有人有一个从检查点恢复并具有非空值的流的示例zeroTime
吗?
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Duration(1000))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Run Code Online (Sandbox Code Playgroud) 我将运行一些较大的模型,并尝试中间结果。
因此,我尝试在每个时期之后使用检查点来保存最佳模型。
这是我的代码:
model = Sequential()
model.add(LSTM(700, input_shape=(X_modified.shape[1], X_modified.shape[2]), return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(700, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(700))
model.add(Dropout(0.2))
model.add(Dense(Y_modified.shape[1], activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# Save the checkpoint in the /output folder
filepath = "output/text-gen-best.hdf5"
# Keep only a single checkpoint, the best over test accuracy.
checkpoint = ModelCheckpoint(filepath,
monitor='val_acc',
verbose=1,
save_best_only=True,
mode='max')
model.fit(X_modified, Y_modified, epochs=100, batch_size=50, callbacks=[checkpoint])
Run Code Online (Sandbox Code Playgroud)
但是在第一个时期之后,我仍然收到警告:
/usr/local/lib/python3.6/site-packages/keras/callbacks.py:432: RuntimeWarning: Can save best model only with val_acc available, skipping.
'skipping.' % (self.monitor), RuntimeWarning)
Run Code Online (Sandbox Code Playgroud)
要添加metrics=['accuracy']
到模型中还存在其他SO问题(例如,在使用预训练的VGG16模型时无法节省重量)的解决方案,但此处仍然存在错误。
当从检查点重新启动火花流时,我得到了这个例外.因为它与我生成的任何代码无关,所以我不知道是什么原因导致了这个问题.
任何的想法?
Exception in thread "streaming-start" java.lang.StackOverflowError
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
...
...
a lot a line that don't satisfy stack overflow . com
...
...
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) …
Run Code Online (Sandbox Code Playgroud) 我们正在从一个号码接收事件。独立的数据源,因此,到达我们的 Flink 拓扑(通过 Kafka)的数据将是无序的。
我们正在 Flink 拓扑中创建 1 分钟的事件时间窗口,并在源操作符处生成事件时间水印作为(当前事件时间 - 某个阈值(30 秒))。
如果在设置的阈值之后有几个事件到达,这些事件将被简单地忽略(在我们的例子中是可以的,因为属于那一分钟的大多数事件已经到达并在相应的窗口中得到处理)。
现在,问题是,如果程序崩溃(无论出于何种原因)然后从最后一个成功的检查点再次恢复,到达事件的无序将触发过去(已处理)窗口的执行(只有极少数事件在该窗口)覆盖上一个结果。该窗口的计算。
如果 Flink 有检查点事件时间水印,这个问题就不会发生。
所以,我想知道是否有办法在 Flink 中强制执行事件时间水印的检查点......
在我的 pytorch 模型中,我正在像这样初始化我的模型和优化器。
model = MyModelClass(config, shape, x_tr_mean, x_tr,std)
optimizer = optim.SGD(model.parameters(), lr=config.learning_rate)
Run Code Online (Sandbox Code Playgroud)
这是我的检查点文件的路径。
checkpoint_file = os.path.join(config.save_dir, "checkpoint.pth")
为了加载这个检查点文件,我检查并查看检查点文件是否存在,然后加载它以及模型和优化器。
if os.path.exists(checkpoint_file):
if config.resume:
torch.load(checkpoint_file)
model.load_state_dict(torch.load(checkpoint_file))
optimizer.load_state_dict(torch.load(checkpoint_file))
Run Code Online (Sandbox Code Playgroud)
另外,这是我保存模型和优化器的方式。
torch.save({'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'iter_idx': iter_idx, 'best_va_acc': best_va_acc}, checkpoint_file)
Run Code Online (Sandbox Code Playgroud)
出于某种原因,每当我运行此代码时,我都会收到一个奇怪的错误。
model.load_state_dict(torch.load(checkpoint_file))
File "/home/Josh/.local/lib/python3.6/site-packages/torch/nn/modules/module.py", line 769, in load_state_dict
self.__class__.__name__, "\n\t".join(error_msgs)))
RuntimeError: Error(s) in loading state_dict for MyModelClass:
Missing key(s) in state_dict: "mean", "std", "attribute.weight", "attribute.bias".
Unexpected key(s) in state_dict: "model", "optimizer", "iter_idx", "best_va_acc"
Run Code Online (Sandbox Code Playgroud)
有谁知道我为什么会收到这个错误?
checkpointing ×10
apache-spark ×5
python ×3
python-3.x ×2
pytorch ×2
amazon-s3 ×1
apache-flink ×1
keras ×1
pickle ×1
scala ×1