小编Mig*_*lvo的帖子

Spark - "sbt package" - "value $不是StringContext的成员" - 缺少Scala插件?

从命令行为小型Spark Scala应用程序运行"sbt package"时,我在以下代码行中得到"value $不是StringContext的成员"编译错误:

val joined = ordered.join(empLogins, $"login" === $"username", "inner")
  .orderBy($"count".desc)
  .select("login", "count")
Run Code Online (Sandbox Code Playgroud)

Intellij 13.1给了我同样的错误信息.在Eclipse 4.4.2中编译相同的.scala源代码没有任何问题.而且它也可以在命令行的单独maven项目中与maven一起使用.

看起来sbt无法识别$符号,因为我在项目/ plugins.sbt文件中缺少一些插件或者我的build.sbt文件中的某些设置.

你熟悉这个问题吗?任何指针将不胜感激.如果需要,我可以提供build.sbt和/或project/plugins.sbt.

scala intellij-idea sbt apache-spark apache-spark-sql

19
推荐指数
2
解决办法
9830
查看次数

Spark Streaming 1.6.0中Checkpointing/WAL的可靠性问题

描述

我们在Scala中有一个Spark Streaming 1.5.2应用程序,它从Kinesis Stream中读取JSON事件,执行一些转换/聚合并将结果写入不同的S3前缀.当前批处理间隔为60秒.我们有3000-7000事件/秒.我们正在使用检查点来保护我们免于丢失聚合.

它已经运行了一段时间,从异常甚至群集重启中恢复.我们最近重新编译了Spark Streaming 1.6.0的代码,只更改了build.sbt文件中的库依赖.在Spark 1.6.0群集中运行代码几个小时之后,我们注意到以下内容:

  1. "投入率"和"处理时间"波动率在1.6.0中大幅增加(见下面的截图).
  2. 每隔几个小时,在写记录时会抛出一个'Exception:BlockAdditionEvent ...到WriteAheadLog.java.util.concurrent.TimeoutException:[5000毫秒]之后的期货超时"异常(参见下面的完整堆栈跟踪)与特定批次(分钟)的下降到0事件/秒一致.

在做了一些挖掘之后,我认为第二个问题看起来与此Pull Request有关.PR的最初目标:"当使用S3作为WALs的目录时,写入时间太长.当多个接收器将AddBlock事件发送到ReceiverTracker时,驱动程序很容易受到瓶颈.此PR在ReceivedBlockTracker中添加事件批处理,以便接收器不会被驱动程序阻塞太长时间."

我们在Spark 1.5.2中的S3中检查点,并且没有性能/可靠性问题.我们在S3和本地NAS中测试了Spark 1.6.0中的检查点,在这两种情况下我们都收到了这个例外.看起来当检查点批次需要超过5秒时,会出现此异常,并且我们已检查该批次的事件是否永远丢失.

问题

  • Spark Streaming 1.6.0中预计"输入速率"和"处理时间"波动的增加是否有任何已知的改进方法?

  • 你知道除了这2个以外的任何解决方法吗?:

    1)保证检查点接收器写入所有文件所需的时间少于5秒.根据我的经验,即使是小批量,也无法保证使用S3.对于本地NAS,它取决于谁负责基础设施(云提供商很难).

    2)增加spark.streaming.driver.writeAheadLog.batchingTimeout属性值.

  • 您是否期望在描述的场景中丢失任何事件?我认为如果批量检查点失败,则不会增加分片/接收器序列号,并且将在稍后重试.

Spark 1.5.2统计 - 截图

在此输入图像描述

Spark 1.6.0统计 - 截图

在此输入图像描述

全栈跟踪

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
    at …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark amazon-kinesis spark-streaming checkpointing

13
推荐指数
1
解决办法
1835
查看次数

有关故障转移过程如何在纱线群集模式下为Spark驱动程序(及其YARN容器)工作的资源/文档

我正在尝试了解在为群集模式部署Yarn时,Spark Driver是否是单点故障.因此,我希望在此上下文中更好地了解有关Spark驱动程序的YARN容器的故障转移过程的内部结构.

我知道Spark Driver将在Yarn Container内的Spark Application Master中运行.如果需要,Spark Application Master将向YARN资源管理器请求资源.但是,如果Spark Application Master(和Spark驱动程序)的YARN容器失败,我无法找到有关故障转移过程详细信息的文档.

我正在尝试找出一些可以让我回答与以下场景相关的问题的详细资源:如果运行Spark Application Master/Spark Driver的YARN Container的主机丢失网络连接1小时:

  1. YARN资源管理器是否使用另一个Spark Application Master/Spark驱动程序生成一个新的YARN容器?

  2. 在那种情况下(产生一个新的YARN容器),如果至少1个执行程序中的1个阶段已经完成并且在失败之前通知原始驱动程序,它是否从头开始启动Spark驱动程序?persist()中使用的选项是否会产生影响?新的Spark Driver会不会知道执行者已经完成了1个阶段?Tachyon会在这种情况下帮忙吗?

  3. 如果在原始Spark Application Master的YARN Container主机中恢复网络连接,是否会触发故障恢复过程?我猜这种行为可以从YARN控制,但我不知道在集群模式下部署SPARK时的默认值.

如果你能指出一些文档/网页,我会非常感激,在这些文档/网页中详细探讨了纱线集群模式中的Spark架构和故障转移过程.

hadoop hadoop-yarn apache-spark alluxio

6
推荐指数
1
解决办法
1056
查看次数

Redshift-用户“ xyz”不能删除,因为该用户拥有某些对象

尝试在Redshift数据库中删除用户“ xyz”时,出现错误消息:

用户'xyz'无法删除,因为用户拥有某些对象。

根据文档

如果用户拥有对象,请先删除对象或将其所有权更改为其他用户,然后再删除原始用户

我如何知道用户拥有哪些对象(方案,表,视图,UDF ??)?

amazon-redshift

4
推荐指数
1
解决办法
2639
查看次数