标签: amazon-emr

如何使用wild字符从aws s3中选择文件

我在s3存储桶中有很多文件,我想复制那些开始日期为2012的文件.下面的命令会复制所有文件.

aws s3 cp s3://bp-dev/bp_source_input/ C:\Business_Panorama\nts\data\in --recursive  --include "201502_nts_*.xlsx"
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-web-services amazon-emr

12
推荐指数
2
解决办法
2万
查看次数

如何在Java应用程序中等待Elastic MapReduce作业流程的完成?

最近我一直在使用亚马逊网络服务(AWS),我注意到关于这个主题的文档不多,所以我添加了我的解决方案.

我正在使用Amazon Elastic MapReduce(Amazon EMR)编写应用程序.计算结束后,我需要对它们创建的文件执行一些工作,所以我需要知道作业流何时完成其工作.

这是您如何检查您的工作流程是否完成的方法:

AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);

DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest()
    .withJobFlowStates("COMPLETED");

List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows();
JobFlowDetail detail = jobs.get(0);

detail.getJobFlowId(); //the id of one of the completed jobs
Run Code Online (Sandbox Code Playgroud)

您还可以查找特定的作业ID DescribeJobFlowsRequest,然后检查该作业是否已完成失败.

我希望它会帮助别人.

java amazon-web-services amazon-emr elastic-map-reduce

11
推荐指数
1
解决办法
3478
查看次数

AWS CLI - 无法解码JSON对象

我使用CLI for AWS创建一个集群并使用json文件中的参数.这是CLI命令字符串Im使用:

aws emr create-cluster --name "Big Matrix Re Run 1" --ami-version 3.1.0 --steps file://Segmentgroup1.json --release-label --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m3.xlarge --auto-terminate
Run Code Online (Sandbox Code Playgroud)

我的json文件(Segmentgroup1.json)位于运行AWS命令字符串的同一个文件夹中,但是我一直收到以下错误:

No JSON object could be decoded
Run Code Online (Sandbox Code Playgroud)

基于我发现它没有找到json文件.有任何想法吗?

amazon-web-services amazon-emr aws-cli

11
推荐指数
1
解决办法
9699
查看次数

如何在Amazon EMR上运行Spark Scala代码

我试图在Amazon EMR上运行以Scala编写的以下Spark代码:

import org.apache.spark.{SparkConf, SparkContext}

object TestRunner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Hello World")
    val sc = new SparkContext(conf)

    val words = sc.parallelize(Seq("a", "b", "c", "d", "e"))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    println(wordCounts)
  }
}
Run Code Online (Sandbox Code Playgroud)

这是我用来将上述代码部署到EMR的脚本:

#!/usr/bin/env bash

set -euxo pipefail

cluster_id='j-XXXXXXXXXX'

app_name="HelloWorld"

main_class="TestRunner"
jar_name="HelloWorld-assembly-0.0.1-SNAPSHOT.jar"
jar_path="target/scala-2.11/${jar_name}"
s3_jar_dir="s3://jars/"
s3_jar_path="${s3_jar_dir}${jar_name}"
###################################################

sbt assembly

aws s3 cp ${jar_path} ${s3_jar_dir}

aws emr add-steps --cluster-id ${cluster_id} --steps Type=spark,Name=${app_name},Args=[--deploy-mode,cluster,--master,yarn-cluster,--class,${main_class},${s3_jar_path}],ActionOnFailure=CONTINUE
Run Code Online (Sandbox Code Playgroud)

但是,退出几分钟后在AWS中根本不产生任何输出!

这是我的控制器的输出:

2016-10-20T21:03:17.043Z INFO Ensure step 3 …
Run Code Online (Sandbox Code Playgroud)

scala amazon-web-services amazon-emr emr apache-spark

11
推荐指数
1
解决办法
2881
查看次数

优化EMR集群上的GC

我在EMR上运行用Scala编写的Spark Job,每个执行程序的stdout都充满了GC分配失败.

2016-12-07T23:42:20.614+0000: [GC (Allocation Failure) 2016-12-07T23:42:20.614+0000: [ParNew: 909549K->432K(1022400K), 0.0089234 secs] 2279433K->1370373K(3294336K), 0.0090530 secs] [Times: user=0.11 sys=0.00, real=0.00 secs] 
2016-12-07T23:42:21.572+0000: [GC (Allocation Failure) 2016-12-07T23:42:21.572+0000: [ParNew: 909296K->435K(1022400K), 0.0089298 secs] 2279237K->1370376K(3294336K), 0.0091147 secs] [Times: user=0.11 sys=0.01, real=0.00 secs] 
2016-12-07T23:42:22.525+0000: [GC (Allocation Failure) 2016-12-07T23:42:22.525+0000: [ParNew: 909299K->485K(1022400K), 0.0080858 secs] 2279240K->1370427K(3294336K), 0.0082357 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2016-12-07T23:42:23.474+0000: [GC (Allocation Failure) 2016-12-07T23:42:23.474+0000: [ParNew: 909349K->547K(1022400K), 0.0090641 secs] 2279291K->1370489K(3294336K), 0.0091965 secs] [Times: user=0.12 sys=0.00, real=0.00 secs] 
Run Code Online (Sandbox Code Playgroud)

我正在阅读几TB的数据(主要是字符串)所以我担心常量GC会减慢处理时间.
我将不胜感激任何有关如何理解此消息以及如何优化GC以使其消耗最少CPU时间的指示.

garbage-collection jvm amazon-emr emr apache-spark

11
推荐指数
1
解决办法
6122
查看次数

结构化流不会将DF写入文件接收器,引用/_spark_metadata/9.compact不存在

我正在EMR 5.11.1,Spark 2.2.1中构建一个Kafka摄取模块.我的目的是使用结构化流来消费Kafka主题,进行一些处理,并以镶木地板格式存储到EMRFS/S3.

控制台接收器按预期工作,文件接收器不起作用.

spark-shell:

val event = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()

val eventdf = event.select($"value" cast "string" as "json")
.select(from_json($"json", readSchema) as "data")
.select("data.*")

val outputdf = <some processing on eventdf>
Run Code Online (Sandbox Code Playgroud)

这有效:

val console_query = outputdf.writeStream.format("console")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start 
Run Code Online (Sandbox Code Playgroud)

这不是:

val filesink_query = outputdf.writeStream
.partitionBy(<some column>)
.format("parquet")
.option("path", <some path in EMRFS>)
.option("checkpointLocation", "/tmp/ingestcheckpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start //fails
Run Code Online (Sandbox Code Playgroud)

我试过的东西不起作用:

  1. sc.hadoopConfiguration.set("parquet.enable.summary-metadata","false")
  2. 将格式更改为CSV而不是镶木地板
  3. 将输出模式更改为完成(仅支持追加)
  4. 不同的触发间隔
  5. readStream上的.option("failOnDataLoss",false)

一些挖掘源代码的人把我带到了这里:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog. scala ,它表示缺少.compact文件应触发默认值.

因此尝试:spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay",60000)以确保在新批处理创建组合元数据文件之前未删除旧批处理元数据

使这个错误烦人的原因是它并不总是可重现的.在不更改代码中的单个字符的情况下,写入镶木地板有时会起作用,或者不起作用.我已经尝试清理检查点位置,spark/hdfs日志等,以防火花内部的"状态"导致此问题.

这是错误堆栈跟踪:

query: …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-emr apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
2670
查看次数

EMR 笔记本失败,因为 JupyterEnterpriseGateway 应用程序未安装在现有集群 AWS 上

我阅读了AWS文档,使用Hadoop SPark和Livy创建了EMR集群高版本。选择笔记本,创建一个,将其连接到集群,但启动时突然停止,并显示消息Cluster ########### 没有安装 JupyterEnterpriseGateway 应用程序。请使用另一个集群重试。 我尝试通过 SSH 连接到主节点并 pip 安装它,但失败了。网上也找不到解决办法。我认为这些集群是开箱即用的。有什么解决办法吗?

amazon-emr

11
推荐指数
1
解决办法
7586
查看次数

如何使用boto3(或其他方式)在emr上自动执行pyspark作业?

我正在创建一个解析大量服务器数据的工作,然后将其上传到Redshift数据库中.

我的工作流程如下:

  • 从S3获取日志数据
  • 使用spark dataframes或spark sql来解析数据并写回S3
  • 将数据从S3上传到Redshift.

我已经开始讨论如何自动执行此操作,以便我的进程旋转EMR集群,引导正确的安装程序,并运行我的python脚本,其中包含用于解析和编写的代码.

有没有人有任何可以与我分享的示例,教程或经验,以帮助我学习如何做到这一点?

python amazon-s3 amazon-emr apache-spark pyspark

10
推荐指数
1
解决办法
9645
查看次数

使用多个S3帐户运行EMR Spark

我有一个EMR Spark Job需要在一个帐户上从S3读取数据并写入另一个帐户.
我把工作分成两步.

  1. 从S3读取数据(因为我的EMR集群在同一帐户中,所以不需要凭据).

  2. 读取步骤1创建的本地HDFS中的数据,并将其写入另一个帐户的S3存储桶.

我试过设置hadoopConfiguration:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your access key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","<your secretkey>")
Run Code Online (Sandbox Code Playgroud)

并导出群集上的密钥:

$ export AWS_SECRET_ACCESS_KEY=
$ export AWS_ACCESS_KEY_ID=
Run Code Online (Sandbox Code Playgroud)

我已经尝试了群集客户端模式以及spark-shell而没有运气.

他们每个都返回一个错误:

ERROR ApplicationMaster: User class threw exception: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: 
Access Denied
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-emr apache-spark

10
推荐指数
1
解决办法
5157
查看次数

S3和EMR数据位置

使用MapReduce和HDFS的数据位置非常重要(同样适用于Spark,HBase).在云中部署集群时,我一直在研究AWS以及两个选项:

  • EC2
  • EMR + S3

第二种选择似乎更有吸引力,原因各不相同,其中最有趣的是能够分别扩展存储和处理以及在不需要时关闭处理(更正确,仅在需要时打开它).是一个解释使用S3的优点的示例.

让我烦恼的是数据局部性的问题.如果数据存储在S3中,则每次运行作业时都需要将其拉到HDFS.我的问题是 - 这个问题有多大,它还值得吗?

令我感到安慰的是,我将在第一次提取数据,然后所有下一个工作将在本地获得中间结果.

我正在寻找一些有实际经验的人的答案.谢谢.

hadoop amazon-s3 amazon-ec2 amazon-web-services amazon-emr

10
推荐指数
1
解决办法
743
查看次数