我正在运行kinesis plus spark应用程序 https://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html
我正在运行如下
ec2实例上的命令:
./spark/bin/spark-submit --class org.apache.spark.examples.streaming.myclassname --master yarn-cluster --num-executors 2 --driver-memory 1g --executor-memory 1g --executor-cores 1 /home/hadoop/test.jar
Run Code Online (Sandbox Code Playgroud)
我在EMR上安装了火花.
EMR details
Master instance group - 1 Running MASTER m1.medium
1
Core instance group - 2 Running CORE m1.medium
Run Code Online (Sandbox Code Playgroud)
我越来越低于INFO,它永远不会结束.
15/06/14 11:33:23 INFO yarn.Client: Requesting a new application from cluster with 2 NodeManagers
15/06/14 11:33:23 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (2048 MB per container)
15/06/14 11:33:23 INFO …Run Code Online (Sandbox Code Playgroud) 我正在AWS EMR上运行5节点Spark群集,每个群集大小为m3.xlarge(1个主4个从属).我成功地运行了一个146Mb的bzip2压缩CSV文件,最终获得了完美的聚合结果.
现在我正在尝试在此群集上处理~5GB bzip2 CSV文件但我收到此错误:
16/11/23 17:29:53 WARN TaskSetManager:阶段6.0中丢失的任务49.2(TID xxx,xxx.xxx.xxx.compute.internal):ExecutorLostFailure(执行者16退出由其中一个正在运行的任务引起)原因:容器由于超过内存限制而被YARN杀死.使用10.4 GB的10.4 GB物理内存.考虑提升spark.yarn.executor.memoryOverhead.
我很困惑为什么我在~75GB群集上获得~10.5GB内存限制(每3m.xlarge实例15GB)...
这是我的EMR配置:
[
{
"classification":"spark-env",
"properties":{
},
"configurations":[
{
"classification":"export",
"properties":{
"PYSPARK_PYTHON":"python34"
},
"configurations":[
]
}
]
},
{
"classification":"spark",
"properties":{
"maximizeResourceAllocation":"true"
},
"configurations":[
]
}
]
Run Code Online (Sandbox Code Playgroud)
根据我的阅读,设置maximizeResourceAllocation属性应告诉EMR配置Spark以充分利用群集上的所有可用资源.即,我应该有~75GB的内存......那么为什么我会得到~10.5GB的内存限制错误?这是我正在运行的代码:
def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp"))
diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
.over(window))
time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
.withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
.orderBy("timestamp")
.rowsBetween(-1, 0))
sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", …Run Code Online (Sandbox Code Playgroud) 我想用一些JSON数据(嵌套)创建一个Hive表并在其上运行查询?这甚至可能吗?
我已经将JSON文件上传到S3并启动了一个EMR实例但是我不知道在hive控制台中输入什么来将JSON文件作为Hive表?
有没有人有一些示例命令让我开始,我找不到任何有用的谷歌......
我在 AWS 账户中有一个 VPC,并且有 5 个与该 VPC 关联的子网。子网有 2 种类型 - 公共和私有。如何识别哪个子网是公共的,哪个是私有的?每个子网都有 CIDR 10.249.?.? 范围。
基本上,当我在该子网中使用 ec2SubnetIds 列表启动 EMR 时,它说 *** 子网配置无效:提供的子网列表包含公共和私有子网。只允许使用一种类型的子网。
如何纠正这个错误。
我正在尝试设置一个环境来支持群集上的探索性数据分析.根据对该项目的初步调查,我的目标是使用Scala/Spark和Amazon EMR来配置群集.
目前我只想尝试一些基本的例子,以确认我已经正确配置了所有内容.我遇到的问题是我没有看到我期望从亚马逊机器实例上的Atlas BLAS库获得的性能.
下面是我简单基准测试的代码片段.它只是一个方形矩阵乘法,然后是短脂肪乘法和高瘦乘法,以产生一个可以打印的小矩阵(我想确保Scala不会因为懒惰的评估而跳过计算的任何部分).
我正在使用Breeze进行线性代数库和netlib-java来拉入BLAS/LAPACK的本地本机库
import breeze.linalg.{DenseMatrix, DenseVector}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.SparkConf
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import scala.reflect.ClassTag
object App {
def NaiveMultiplication(n: Int) : Unit = {
val vl = java.text.NumberFormat.getIntegerInstance.format(n)
println(f"Naive Multipication with vector length " + vl)
println(blas.getClass().getName())
val sm: DenseMatrix[Double] = DenseMatrix.rand(n, n)
val a: DenseMatrix[Double] = DenseMatrix.rand(2,n)
val b: DenseMatrix[Double] = DenseMatrix.rand(n,3)
val c: DenseMatrix[Double] = sm * sm
val cNormal: DenseMatrix[Double] = (a * c) * b …Run Code Online (Sandbox Code Playgroud) 我有一个非常简单的PySpark脚本,它从S3上的一些拼花数据创建一个数据帧,然后调用count()方法并打印出记录数.
我在AWS EMR集群上运行脚本,我看到以下奇怪的WARN信息:
17/12/04 14:20:26 WARN ServletHandler:
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689)
at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:524)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.status.api.v1.MetricHelper.submetricQuantiles(AllStagesResource.scala:313) …Run Code Online (Sandbox Code Playgroud) 我正在写,看看是否有人知道如何加速从EMR中运行的Spark的S3写入时间?
我的Spark Job需要4个多小时才能完成,但群集在前1.5个小时内仅处于负载状态.
我很好奇Spark一直在做什么.我查看了日志,发现了很多s3 mv命令,每个文件一个.然后直接看看S3我看到我的所有文件都在_temporary目录中.
其次,我关注我的集群成本,看来我需要为这项特定任务购买2小时的计算.但是,我最终买了5个小时.我很好奇EMR AutoScaling在这种情况下是否有助于降低成本.
一些文章讨论了更改文件输出提交器算法,但我没有成功.
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
Run Code Online (Sandbox Code Playgroud)
写入本地HDFS很快.我很好奇,如果发出一个hadoop命令将数据复制到S3会更快吗?
我在Hive中实现了一项任务.目前它在我的单节点集群上工作正常.现在我打算在AWS上部署它.
我对AWS一无所知.如果我计划部署它,那么我应该选择Amazon EC2或Amazon EMR?
我想改善我的任务的表现.哪一个更好,更可靠?如何接近他们?我听说我们也可以在AWS上注册我们的VM设置.可能吗?
请尽快建议我.
非常感谢.
我希望能够创建EMR集群,并让这些集群将消息发送回某个中央队列.为了使其工作,我需要在每个主节点上运行某种代理.这些代理中的每一个都必须在此消息中标识自己,以便收件人知道该消息所针对的群集.
主节点是否知道其ID(j-*************)?如果没有,那么是否还有其他一些识别信息可以让邮件收件人推断出这个ID?
我已经看了一下配置文件/home/hadoop/conf,我没有发现任何有用的东西.我发现了ID /mnt/var/log/instance-controller/instance-controller.log,但看起来很难找到.我想知道实例控制器可能从哪个位置获取该ID.
我正在寻找构建一个Web应用程序,它需要在R中按需运行资源密集型MCMC(马尔可夫链蒙特卡罗)计算,以便为用户生成一些概率图.
约束:
显然,我不希望在与Web应用程序前端相同的服务器上运行资源密集型计算,因此需要将这些任务交给工作者实例.
这些计算需要运行大量的CPU,并且我希望将延迟保持在尽可能低的水平(希望是秒,而不是几分钟),所以我宁愿在更强大的硬件上运行计算.
我不能以~66¢/ hr x 24小时/天运行强大的EC2实例,因此可能需要按需或点请求实例.
以下是我提出的选项:
每天24小时运行廉价,经济实惠的工作实例,在Amazon SWF(或SQS)管理的时间内执行一项任务.
缺点:
每个任务生成一个更强大的工作者实例(每当作业添加到队列时旋转)并在完成时终止实例.
缺点:
与#2类似,但具有低出价的EC2现场请求.
缺点:
某种混合溶液中,我积极监测仡硬件工人实例及其负载并智能地旋转起来,并终止情况下的时间来维持的成本和可用性的最佳平衡
缺点:
我希望有一些服务,我可以在一分钟到一分钟而不是每小时支付高度可用的按需硬件.
所以我的问题如下:
你会如何推荐解决这个问题?
是否有一个好的EC2实例管理解决方案可以位于Amazon SWF之上并帮助我实现负载平衡并终止闲置工作人员?
现货请求出价是否可以解决我的问题,还是更适合那些不一定需要立即完成的任务?
amazon-emr ×10
apache-spark ×5
amazon-ec2 ×3
emr ×2
hadoop ×2
hive ×2
amazon-s3 ×1
amazon-swf ×1
amazon-vpc ×1
bigdata ×1
hadoop-yarn ×1
jblas ×1
json ×1
pyspark ×1
r ×1
scala-breeze ×1
subnet ×1