小编mat*_*ieu的帖子

AWS Kinesis Firehose不在Redshift中插入数据

我尝试让一个Kinesis Firehose在Redshift表中推送数据.

firehose流正在工作并将数据放入S3.

但没有任何东西到达Redshift的目的地表.

  • 在指标DeliveryToRedshift成功为0(DeliveryToRedshift记录为空)
  • 加载日志(redshift Web控制台)和STL_LOAD_ERRORS表为空.
  • 我检查过Firehose能够连接到Redshift(我看到STL_CONNECTION_LOG中的连接)

我该如何解决这个问题?

amazon-web-services amazon-redshift amazon-kinesis-firehose

15
推荐指数
3
解决办法
8437
查看次数

Spark Listener EventLoggingListener引发了异常/ ConcurrentModificationException

在我们的应用程序(Spark 2.0.1)中,我们经常出现此异常.我找不到任何关于此事的内容.可能是什么原因 ?

16/10/27 11:18:24 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
    at java.util.ArrayList$Itr.next(ArrayList.java:851)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableLike$class.to(TraversableLike.scala:590)
    at scala.collection.AbstractTraversable.to(Traversable.scala:104)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
    at scala.collection.AbstractTraversable.toList(Traversable.scala:104)
    at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:314)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson$5.apply(JsonProtocol.scala:291)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:291)
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
    at org.apache.spark.util.JsonProtocol$$anonfun$taskInfoToJson$12.apply(JsonProtocol.scala:283)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:283)
    at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:145)
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:137)
    at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:157)
    at …
Run Code Online (Sandbox Code Playgroud)

apache-spark

11
推荐指数
1
解决办法
4861
查看次数

WPF渲染冻结

我遇到了一个GUI冻结问题,与硬件与软件UI渲染有关.

上下文:在双显示器XP嵌入式计算机(DirectX 9.0c)上运行的2个WPF应用程序(.NET 3.5 SP1).

在某些时候,其中一个应用程序冻结.但只有渲染:GUI保持活动状态(消息泵处于活动状态,应用程序响应),但渲染未完成.冻结可以持续数秒或永久.第二个应用程序保持正常运行 该错误仅在双显示配置中发生,第二个应用程序是一种催化剂(更多内存和CPU使用...)

使用Perforator,我发现视频内存使用率持续上升和下降(参见:截图).在此期间,一个本机线程消耗100%的CPU(堆栈跟踪涉及WPFgfx,DirectX,GDI和视频驱动程序)

通常,当应用程序解冻时,它会完全呈现软件(使用Perforator进行紫色调).此外,取消激活硬件渲染会解冻应用程序.

我的结论,到目前为止,是我面对的视频内存不足(如在点#14描述在这个岗位),以试图回退到软件渲染,但有一个循环,所以在硬件模式下再次尝试.

我对吗 ?它会更像是WPF问题还是视频驱动程序问题?甚至是DirectX?我可以调整一些参数吗?

directx wpf freeze .net-3.5

9
推荐指数
1
解决办法
1995
查看次数

在线学习Spark中的LDA模型

有没有办法以在线学习的方式训练LDA模型,即.加载以前的火车模型,并用新文件更新?

machine-learning lda apache-spark apache-spark-ml apache-spark-mllib

9
推荐指数
1
解决办法
1245
查看次数

Spark在水槽中构建流式一致性

在下面的例子中,我想更好地理解Spark 2.2结构化流的一致性模型:

  • 一个来源(Kinesis)
  • 从此源向2个不同的接收器发出2个查询:一个文件接收器用于存档目的(S3),另一个接收器用于处理数据(DB或文件,尚未确定)

我想了解是否在汇点之间存在任何一致性保证,至少在某些情况下:

  • 水槽中的一个可以领先于另一个水槽吗?或者他们在源上以相同的速度消耗数据(因为它是相同的源)?它们可以同步吗?
  • 如果我(优雅地)停止流应用程序,2接收器上的数据是否一致?

原因是我想构建一个类似Kappa的处理应用程序,能够在我想重新处理某些历史记录时暂停/关闭流媒体部分,并且当我恢复流式传输时,避免重新处理已经处理的内容(如在历史中),或遗漏了一些(例如,一些尚未提交到存档的数据,然后在流式恢复时已经处理的数据被跳过)

apache-spark spark-streaming

7
推荐指数
1
解决办法
640
查看次数

Spark永远不会完成作业和阶段,JobProgressListener会崩溃

我们有一个Spark应用程序,可以持续处理大量传入的作业.在多个线程上并行处理多个作业.

在密集型工作负载期间,在某些时候,我们开始有这样的警告:

16/12/14 21:04:03 WARN JobProgressListener: Task end for unknown stage 147379
16/12/14 21:04:03 WARN JobProgressListener: Job completed for unknown job 64610
16/12/14 21:04:04 WARN JobProgressListener: Task start for unknown stage 147405
16/12/14 21:04:04 WARN JobProgressListener: Task end for unknown stage 147406
16/12/14 21:04:04 WARN JobProgressListener: Job completed for unknown job 64622
Run Code Online (Sandbox Code Playgroud)

从那开始,应用程序的性能直线下降,大多数阶段和工作从未完成.在SparkUI上,我可以看到13000个挂起/活动作业等数字.

我无法清楚地看到更多信息之前发生的另一个异常.也许这个,但它涉及另一个听众:

16/12/14 21:03:54 ERROR LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep …
Run Code Online (Sandbox Code Playgroud)

apache-spark

6
推荐指数
1
解决办法
2249
查看次数

使用非等号键自定义连接

我需要实现一个自定义连接策略,它将匹配非严格相等的键.为了说明,可以考虑距离:当键足够接近时应该进行连接(尽管在我的情况下,它比仅仅距离度量更复杂)

所以我不能通过重写equals来实现它,因为没有相等性(我需要为其他需求保持真正的相等性测试).我想我还需要实现一个合适的分区器.

我怎么能这样做?

join apache-spark

5
推荐指数
1
解决办法
2818
查看次数

重新移位授予验证 HAS_TABLE_PRIVILEGE 失败

我正在与 Redshit 授予的组、模式、默认权限作斗争。当我尝试使用 HAS_TABLE_PRIVILEGE 检查一切是否正确

select tablename, 
   HAS_TABLE_PRIVILEGE('analyst', tablename, 'select') as select,
   HAS_TABLE_PRIVILEGE('analyst', tablename, 'insert') as insert,
   HAS_TABLE_PRIVILEGE('analyst', tablename, 'update') as update,
   HAS_TABLE_PRIVILEGE('analyst', tablename, 'delete') as delete, 
   HAS_TABLE_PRIVILEGE('analyst', tablename, 'references') as references 
from pg_tables
where schemaname='datalab'
order by tablename
Run Code Online (Sandbox Code Playgroud)

我明白了:

An error occurred when executing the SQL command:
select tablename, 
   HAS_TABLE_PRIVILEGE('analyst', tablename, 'select') as select,
   HAS_TABLE_PRIVILEGE('analyst', tablena...

[Amazon](500310) Invalid operation: relation "dss__transaction" does not exist;

Execution time: 0.2s
1 statement failed.
Run Code Online (Sandbox Code Playgroud)

这里到底发生了什么?

permissions amazon-redshift sql-grant

5
推荐指数
1
解决办法
1530
查看次数

运行时的Kerberos模拟Spark上下文

我有一个Spark应用程序,它通过多个线程上的几个Spark会话同时为不同的用户执行各种作业。

我的客户想对他的Hadoop集群进行kerberize。我想知道是否有一种方法可以配置模拟,例如将这些作业中的每一个与不同的代理用户一起运行。根据我在spark conf和代码中看到的内容,无法在运行时针对特定上下文执行此操作,但是我不熟悉Kerberos或Spark的这一部分。

任何人都可以确认/确认吗?

impersonation hadoop kerberos hadoop-yarn apache-spark

5
推荐指数
0
解决办法
193
查看次数

在Beanstalk中为php / symfony网站配置apache MPM

我正在使用AWS beantalk托管php / symphony应用程序。我想适当地配置有关机器大小的apache,当前为t2.small,即。1核2 GB RAM,这非常小。

apachectl -V给我这个:

Server version: Apache/2.4.16 (Amazon)
Server built:   Aug 13 2015 23:52:13
Server's Module Magic Number: 20120211:47
Server loaded:  APR 1.5.0, APR-UTIL 1.4.1
Compiled using: APR 1.5.0, APR-UTIL 1.4.1
Architecture:   64-bit
Server MPM:     prefork
  threaded:     no
  forked:     yes (variable process count)
Run Code Online (Sandbox Code Playgroud)

从此文档中,我至少需要相对于可用RAM在MPM / prefork中配置MaxRequestWorkers。

在beantalk中进行配置的正确方法是什么?我猜是通过.ebextensions吗?怎么样 ?

php apache amazon-web-services amazon-elastic-beanstalk

3
推荐指数
1
解决办法
1091
查看次数

在多个列上应用自定义Spark Aggregator(Spark 2.0)

Aggregator[]为Strings 创建了一个自定义.

我想将它应用于DataFrame所有列都是字符串的所有列,但列号是任意的.

我坚持写正确的表达方式.我想写这样的东西:

df.agg( df.columns.map( c => myagg(df(c)) ) : _*) 
Run Code Online (Sandbox Code Playgroud)

鉴于各种接口,这显然是错误的.

我看了一下RelationalGroupedDataset.agg(expr: Column, exprs: Column*)代码,但我不熟悉表达式操作.

任何的想法 ?

aggregate-functions user-defined-functions apache-spark apache-spark-sql

2
推荐指数
1
解决办法
2167
查看次数

scala未来的处理深度 - 首先不是广度优先

我有一个大的计算大致基于以下模式:

def f1(i:Int):Int = ???
def f2(i:Int):Int = ???

def processA(l: List[Int]) = 
  l.map(i => Future(f1(i)))

def processB(l: List[Int]) = {
  val p = processA(l)
  p.map(fut => fut.map(f2))
}

def main() = {
  val items = List( /* 1k to 10k items here */ )
  val results = processB(items)
  results.map(_.onComplete ( ... ))
}
Run Code Online (Sandbox Code Playgroud)

如果我的理解是正确的,我遇到的问题是处理是广度优先的.ProcessA启动了数千个期货,然后processB将汇集数千个新的期货,这些期货将在processA完成后处理.onComplete回调将开始很晚才开始...

我想把这个深度优先:过程A的几个未来开始,然后,processB从那里继续而不是切换到队列中的其他东西.

可以在香草scala中完成吗?我应该转向一些替代Futures()和ThreadPools的lib吗?

编辑:更详细一点.f1 andThen f2正如答案中所建议的那样,重写目前是不切实际的.实际上,processA and B正在做一堆其他事情(包括副作用).而processB依赖的事实ProcessA是私人的.如果曝光,它会破坏SoC.

编辑2:我想我会放松一点"香草"约束.有人建议Akka流可以提供帮助.我目前正在看scalaz.Task:有意见吗?

scala future threadpool

0
推荐指数
1
解决办法
478
查看次数