小编cra*_*rak的帖子

亚马逊s3a使用Spark返回400 Bad Request

出于结帐目的,我尝试将Amazon S3存储桶设置为检查点文件.

val checkpointDir = "s3a://bucket-name/checkpoint.txt"
val sc = new SparkContext(conf)
sc.setLocalProperty("spark.default.parallelism", "30")
sc.hadoopConfiguration.set("fs.s3a.access.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "bucket-name.s3-website.eu-central-1.amazonaws.com")
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint(checkpointDir)
Run Code Online (Sandbox Code Playgroud)

但它会因此异常而停止

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 9D8E8002H3BBDDC7, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: Qme5E3KAr/KX0djiq9poGXPJkmr0vuXAduZujwGlvaAl+oc6vlUpq7LIh70IF3LNgoewjP+HnXA=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-web-services hdfs apache-spark spark-streaming

8
推荐指数
2
解决办法
9770
查看次数

Spark从检查点重新启动流时发生的事情

在长时间不活动(3day)后重新启动火花时.

val ssc = StreamingContext.getOrCreate(checkpointDir, newStreamingContext _, createOnError = createOnError)
Run Code Online (Sandbox Code Playgroud)

我看到重启是痛苦的.

标签流需要45分钟才能显示,这意味着"火花已经完成了检查点的加载".(从检查点文件加载最后一批很长时间)

之后,它显示1000个批次,0事件.当我在几分钟后重新启动时,它只显示错过的批次(当停机时间约为5分钟时,10批30秒)并且"快速"加载.

所以这让我觉得加载我的检查点需要时间,因为它加载了这1000个批次.

因为1000个30s的30s不匹配3天,我想知道当这1000个批次完成时会发生什么,它会在当前时间重新启动还是加载其他错过的批次?这1000个限制是否可配置?

编辑:在这1000个批次之后没有任何反应,直接kafka没有创建新的批次.我认为这不是预期的功能,我不愿意制作关于此的火花jira票.


因为问题不是单独出现的,我认为这1000个批次都装在驱动程序内存中.

有些批次后有时会有OOM.当它没有时,我看到我的总延迟提高,而平均处理时间低于批处理时间.这让我觉得我的驱动程序几乎是OOM,并且难以向执行程序发送批处理.

当然,当我的流不是从检查点创建的时候,每件事都运作良好.那么?当流从检查点开始时会发生什么?


ps:0事件批处理包含事件,因为它们花费的时间与我通常的完整批次一样多,而且我看到kafka偏移量增加,所以我认为是一个显示错误的火花UI.

out-of-memory apache-spark spark-streaming

6
推荐指数
0
解决办法
402
查看次数

返回 clickhouse 数组作为列

Clickhouse 是否可以将包含一对数组的结果转换为列?

\n\n

形成这个结果:

\n\n
\xe2\x94\x8c\xe2\x94\x80f1\xe2\x94\x80\xe2\x94\x80\xe2\x94\xacf2\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xacf3\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x90\n\xe2\x94\x82 \'a\' \xe2\x94\x82 [1,2,3] \xe2\x94\x82 [\'x\',\'y\',\'z\'] \xe2\x94\x82\n\xe2\x94\x82 \'b\' \xe2\x94\x82 [4,5,6] \xe2\x94\x82 [\'x\',\'y\',\'z\'] \xe2\x94\x82\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x98\n
Run Code Online (Sandbox Code Playgroud)\n\n

到 :

\n\n
\xe2\x94\x8c\xe2\x94\x80f1\xe2\x94\x80\xe2\x94\x80\xe2\x94\xacx\xe2\x94\x80\xe2\x94\x80\xe2\x94\xacy\xe2\x94\x80\xe2\x94\x80\xe2\x94\xacz\xe2\x94\x80\xe2\x94\x80\xe2\x94\x90\n\xe2\x94\x82 \'a\' \xe2\x94\x82 1 \xe2\x94\x82 2 \xe2\x94\x82 3 \xe2\x94\x82\n\xe2\x94\x82 \'b\' \xe2\x94\x82 4 \xe2\x94\x82 5 \xe2\x94\x82 6 \xe2\x94\x82\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\xb4\xe2\x94\x80\xe2\x94\x80\xe2\x94\x80\xe2\x94\x98\n
Run Code Online (Sandbox Code Playgroud)\n\n

这个想法是不必为每行重复标题值。

\n\n

就我而言,“标头”数组 f3 通过查询唯一并连接到 f1、f2。

\n

sql clickhouse

6
推荐指数
1
解决办法
1万
查看次数

Apache Spark Ui 中的自定义指标

我正在使用 Apache Spark,并且指标 UI(在 4040 上找到)非常有用。

我想知道是否可以在此 UI 中添加自定义指标、自定义任务指标,但也可能添加自定义 RDD 指标。(就像只为 RDD 转换执行时间)

按流批处理作业和任务分组自定义指标可能会很好。

我见过TaskMetrics对象,但它被标记为 dev api,它看起来只对输入或输出源有用,不支持自定义值。

有火花的方式来做到这一点吗?或者替代方案?

apache-spark codahale-metrics

5
推荐指数
1
解决办法
614
查看次数

从检查点启动火花流时堆栈溢出

当从检查点重新启动火花流时,我得到了这个例外.因为它与我生成的任何代码无关,所以我不知道是什么原因导致了这个问题.

任何的想法?

Exception in thread "streaming-start" java.lang.StackOverflowError
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
...
...
a lot a line that don't satisfy stack overflow . com
...
...
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) …
Run Code Online (Sandbox Code Playgroud)

stack-overflow apache-spark spark-streaming checkpointing

5
推荐指数
0
解决办法
344
查看次数

cassandra中大分区的搜索键

nodetool cfstats/tablestats 显示“压缩分区最大字节数”

现在如何找到这个分区或其他巨大分区的键?

目的是分析为什么这些分区越来越大,并相应地修正数据模型。

我已经看到可以在日志中看到这些分区键,但不幸的是我的日志会定期删除。

cassandra

5
推荐指数
1
解决办法
1245
查看次数

为什么在 ClickHouse 表中不鼓励浮点表示?

文档Float32并没有真正解释 的行为Float64以及不鼓励它们的原因。

我问这个问题是因为在将这些与控制台 cli 请求或 Rest 请求一起使用时,我看到了奇怪的行为。无论精度如何,发送到 clickhouse 的浮点值在最后一位都会稍微修改。

示例:1258.021545成为1258.0215453.

每次插入这些值时,最后一位数字都会更改。我不认为问题来自太高的精度值,因为这些值来自 Java 双精度数。

floating-point decimal-point clickhouse

5
推荐指数
1
解决办法
2598
查看次数

Cassandra火花连接器joinWithCassandraTable在具有不同名称的字段上

我正在寻找一个RDD和一个cassandra表的连接,它们对于相同的密钥ex(简化)具有不同的名称:

case class User(id : String, name : String)
Run Code Online (Sandbox Code Playgroud)

case class Home( address : String, user_id : String)
Run Code Online (Sandbox Code Playgroud)

如果想做:

rdd[Home].joinWithCassandraTable("testspark","user").on(SomeColumns("id"))
Run Code Online (Sandbox Code Playgroud)

如何确定要进行连接的字段的名称.而且我不想将rdd映射到只有正确的id,因为我想在joinWithCassandraTable之后加入所有值.

scala cassandra datastax-enterprise apache-spark spark-cassandra-connector

3
推荐指数
1
解决办法
3804
查看次数

Apache Spark错误的akka​​-remote netty版本

当火花正在运行测试与sbt.我得到这个例外:

18:58:49.049 [sparkDriver-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.VerifyError: (class: org/jboss/netty/channel/socket/nio/NioWorkerPool, method: createWorker signature: (Ljava/util/concurrent/Executor;)Lorg/jboss/netty/channel/socket/nio/AbstractNioWorker;) Wrong return type in function
at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:283) ~[akka-remote_2.11-2.3.4-spark.jar:na]
at akka.remote.transport.netty.NettyTransport.<init>(NettyTransport.scala:240) ~[akka-remote_2.11-2.3.4-spark.jar:na]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_45]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_45]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_45]
at java.lang.reflect.Constructor.newInstance(Constructor.java:422) ~[na:1.8.0_45]
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) ~[akka-actor_2.11-2.3.4-spark.jar:na]
at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.7.jar:0.13.8]
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) ~[akka-actor_2.11-2.3.4-spark.jar:na]
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) ~[akka-actor_2.11-2.3.4-spark.jar:na]
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) ~[akka-actor_2.11-2.3.4-spark.jar:na]
at scala.util.Success.flatMap(Try.scala:231) ~[scala-library-2.11.7.jar:0.13.8]
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) ~[akka-actor_2.11-2.3.4-spark.jar:na]
at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:692) ~[akka-remote_2.11-2.3.4-spark.jar:na]
at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:684) ~[akka-remote_2.11-2.3.4-spark.jar:na]
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:728) ~[scala-library-2.11.7.jar:0.13.8]
at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:0.13.8] …
Run Code Online (Sandbox Code Playgroud)

akka netty apache-spark akka-remote-actor

3
推荐指数
1
解决办法
3844
查看次数