小编Mik*_*nte的帖子

EMR上Spark中的S3 SlowDown错误

我在写一个镶木地板文件时遇到这个错误,这已经开始发生了

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 2CA496E2AB87DC16), S3 Extended Request ID: 1dBrcqVGJU9VgoT79NAVGyN0fsbj9+6bipC7op97ZmP+zSFIuH72lN03ZtYabNIA2KaSj18a8ho=
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1777)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:7)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:125)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:355)
    at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy28.deleteAll(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1331)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:296)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:463)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortJob(FileOutputCommitter.java:482)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:134)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:146) …
Run Code Online (Sandbox Code Playgroud)

scala amazon-s3 amazon-emr apache-spark apache-spark-dataset

17
推荐指数
1
解决办法
3494
查看次数

Json4s 忽略 @JsonProperty jackson 注释

我有这个代码

import com.fasterxml.jackson.annotation.JsonProperty
import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization.{read, write}

object Testing extends App {
    implicit val formats = DefaultFormats
    val json =
            """
              |{
              |"1strange_field_name":"name"
              |}
            """.stripMargin
    println(read[Test](json))
}

case class Test(@JsonProperty("1strange_field_name") testName: Option[String])
Run Code Online (Sandbox Code Playgroud)

它应该打印 Test(Some(name)) 但它打印 Test(None)。这是由于 json4s 没有使用 @JsonProperty 注释造成的。有没有办法配置 json4s 以使用 jackson 注释?

scala jackson json4s

5
推荐指数
1
解决办法
660
查看次数

Scala对象和线程安全

我是Scala的新手.

我试图弄清楚如何使用Scala对象(也称为单例)中的函数来确保线程安全

从我到目前为止所读到的内容来看,我似乎应该保持对函数范围(或下面)的可见性,并尽可能使用不可变变量.但是,我没有看到违反线程安全的例子,所以我不确定应该采取什么其他预防措施.

有人能指出我对这个问题的一个很好的讨论,最好举例说明线程安全被违反的地方吗?

concurrency scala thread-safety

4
推荐指数
1
解决办法
3699
查看次数

为了理解未来和两者

我正在努力以一种很好的 monadic 方式组合一系列异步进程。该过程的每一步都可能失败,因此它正在检索Future[Either[String, T]].

def firstStep(input: Int): Future[Either[String, Long]] = ???
def secondStep(input: Long): Future[Either[String, String]] = ???
def thirdStep(input: String): Future[Either[String, Double]] = ???
Run Code Online (Sandbox Code Playgroud)

鉴于这些功能,我想像这样组合它们

def process(input: Int): Future[Either[String Double]] = {
     for{
        res1 <- firstStep(input)
        res2 <- secondStep(res1)
        res3 <- thirdStep(res2)
     } yield res3
}
Run Code Online (Sandbox Code Playgroud)

但这不起作用,因为每个部分结果都是一个Either[String, T],而我需要的是它T本身(或者只是停止执行并返回,Left如果是这种情况)。

如何以一种很好的 monadic 方式(使用 for-comprehensions)组合这个函数?

monads scala for-comprehension scala-2.12

4
推荐指数
1
解决办法
1230
查看次数

如何实现Functor [Dataset]

我奋力关于如何创建的实例Functor[Dataset]...问题是,当你mapABEncoder[B]必须在隐含的范围,但我不知道该怎么做.

implicit val datasetFunctor: Functor[Dataset] = new Functor[Dataset] {
    override def map[A, B](fa: Dataset[A])(f: A => B): Dataset[B] = fa.map(f)
  }
Run Code Online (Sandbox Code Playgroud)

当然这个代码抛出一个编译错误,因为Encoder[B]不可用,但我不能添加Encoder[B]为隐式参数,因为它会改变map方法签名,我该如何解决?

scala apache-spark scala-cats scala-implicits apache-spark-encoders

4
推荐指数
1
解决办法
178
查看次数

为什么Spark RDD在T中是不变的?

我希望能够做到这样的事情

abstract class Super()
class Type1() extends Super
class Type2() extends Super

val rdd1 = sc.parallelize(Seq(new Type1()))
val rdd2= sc.parallelize(Seq(new Type2()))
val union = rdd1.union(rdd2)
Run Code Online (Sandbox Code Playgroud)

如果RDD在T union上是协变的,则RDD [Super],但这甚至不编译.有没有理由让RDD在T中不变?

scala covariance apache-spark rdd

3
推荐指数
1
解决办法
580
查看次数

映射到列表头部时了解Scala Map对象

嗨,我有以下数据,并希望将其映射到第二个参数中的第一项.因此对于:

1 -> List((1,11))
1 -> List((1,1), (1,111))
Run Code Online (Sandbox Code Playgroud)

我想要:

(1,11)
(1,1)
Run Code Online (Sandbox Code Playgroud)

当此数据在RDD中时,我可以执行以下操作:

scala> val m = sc.parallelize(Seq(11 -> List((1,11)), 1 -> List((1,1),(1,111))))
m: org.apache.spark.rdd.RDD[(Int, List[(Int, Int)])] = ParallelCollectionRDD[198] at parallelize at <console>:47

scala> m.map(_._2.head).collect.foreach(println)
(1,11)
(1,1)
Run Code Online (Sandbox Code Playgroud)

但是,当它在Map对象(groupBy的结果)中时,我得到以下内容:

scala> val m = Map(11 -> List((1,11)), 1 -> List((1,1)))
m: scala.collection.immutable.Map[Int,List[(Int, Int)]] = Map(11 -> List((1,11)), 1 -> List((1,1), (1,111)))

scala> m.map(_._2.head)
res1: scala.collection.immutable.Map[Int,Int] = Map(1 -> 1)
Run Code Online (Sandbox Code Playgroud)

当我映射到整个列表时,我得到了我期望的结果,但是当我打电话给它时

scala> m.map(_._2)
res2: scala.collection.immutable.Iterable[List[(Int, Int)]] = List(List((1,11)), List((1,1), (1,111)))
Run Code Online (Sandbox Code Playgroud)

如果我执行以下任一操作,我也可以得到我想要的结果:

scala> m.map(_._2).map(_.head)
res4: …
Run Code Online (Sandbox Code Playgroud)

scala implicit scala-collections scala-implicits

0
推荐指数
1
解决办法
534
查看次数