我在s3存储桶中有很多文件,我想复制那些开始日期为2012的文件.下面的命令会复制所有文件.
aws s3 cp s3://bp-dev/bp_source_input/ C:\Business_Panorama\nts\data\in --recursive --include "201502_nts_*.xlsx"
Run Code Online (Sandbox Code Playgroud) 最近我一直在使用亚马逊网络服务(AWS),我注意到关于这个主题的文档不多,所以我添加了我的解决方案.
我正在使用Amazon Elastic MapReduce(Amazon EMR)编写应用程序.计算结束后,我需要对它们创建的文件执行一些工作,所以我需要知道作业流何时完成其工作.
这是您如何检查您的工作流程是否完成的方法:
AmazonElasticMapReduce mapReduce = new AmazonElasticMapReduceClient(credentials);
DescribeJobFlowsRequest jobAttributes = new DescribeJobFlowsRequest()
.withJobFlowStates("COMPLETED");
List<JobFlowDetail> jobs = mapReduce.describeJobFlows(jobAttributes).getJobFlows();
JobFlowDetail detail = jobs.get(0);
detail.getJobFlowId(); //the id of one of the completed jobs
Run Code Online (Sandbox Code Playgroud)
您还可以查找特定的作业ID DescribeJobFlowsRequest,然后检查该作业是否已完成失败.
我希望它会帮助别人.
我使用CLI for AWS创建一个集群并使用json文件中的参数.这是CLI命令字符串Im使用:
aws emr create-cluster --name "Big Matrix Re Run 1" --ami-version 3.1.0 --steps file://Segmentgroup1.json --release-label --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m3.xlarge --auto-terminate
Run Code Online (Sandbox Code Playgroud)
我的json文件(Segmentgroup1.json)位于运行AWS命令字符串的同一个文件夹中,但是我一直收到以下错误:
No JSON object could be decoded
Run Code Online (Sandbox Code Playgroud)
基于我发现它没有找到json文件.有任何想法吗?
我试图在Amazon EMR上运行以Scala编写的以下Spark代码:
import org.apache.spark.{SparkConf, SparkContext}
object TestRunner {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Hello World")
val sc = new SparkContext(conf)
val words = sc.parallelize(Seq("a", "b", "c", "d", "e"))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
println(wordCounts)
}
}
Run Code Online (Sandbox Code Playgroud)
这是我用来将上述代码部署到EMR的脚本:
#!/usr/bin/env bash
set -euxo pipefail
cluster_id='j-XXXXXXXXXX'
app_name="HelloWorld"
main_class="TestRunner"
jar_name="HelloWorld-assembly-0.0.1-SNAPSHOT.jar"
jar_path="target/scala-2.11/${jar_name}"
s3_jar_dir="s3://jars/"
s3_jar_path="${s3_jar_dir}${jar_name}"
###################################################
sbt assembly
aws s3 cp ${jar_path} ${s3_jar_dir}
aws emr add-steps --cluster-id ${cluster_id} --steps Type=spark,Name=${app_name},Args=[--deploy-mode,cluster,--master,yarn-cluster,--class,${main_class},${s3_jar_path}],ActionOnFailure=CONTINUE
Run Code Online (Sandbox Code Playgroud)
但是,退出几分钟后在AWS中根本不产生任何输出!
这是我的控制器的输出:
2016-10-20T21:03:17.043Z INFO Ensure step 3 …Run Code Online (Sandbox Code Playgroud) 我在EMR上运行用Scala编写的Spark Job,每个执行程序的stdout都充满了GC分配失败.
2016-12-07T23:42:20.614+0000: [GC (Allocation Failure) 2016-12-07T23:42:20.614+0000: [ParNew: 909549K->432K(1022400K), 0.0089234 secs] 2279433K->1370373K(3294336K), 0.0090530 secs] [Times: user=0.11 sys=0.00, real=0.00 secs]
2016-12-07T23:42:21.572+0000: [GC (Allocation Failure) 2016-12-07T23:42:21.572+0000: [ParNew: 909296K->435K(1022400K), 0.0089298 secs] 2279237K->1370376K(3294336K), 0.0091147 secs] [Times: user=0.11 sys=0.01, real=0.00 secs]
2016-12-07T23:42:22.525+0000: [GC (Allocation Failure) 2016-12-07T23:42:22.525+0000: [ParNew: 909299K->485K(1022400K), 0.0080858 secs] 2279240K->1370427K(3294336K), 0.0082357 secs] [Times: user=0.12 sys=0.00, real=0.01 secs]
2016-12-07T23:42:23.474+0000: [GC (Allocation Failure) 2016-12-07T23:42:23.474+0000: [ParNew: 909349K->547K(1022400K), 0.0090641 secs] 2279291K->1370489K(3294336K), 0.0091965 secs] [Times: user=0.12 sys=0.00, real=0.00 secs]
Run Code Online (Sandbox Code Playgroud)
我正在阅读几TB的数据(主要是字符串)所以我担心常量GC会减慢处理时间.
我将不胜感激任何有关如何理解此消息以及如何优化GC以使其消耗最少CPU时间的指示.
我正在EMR 5.11.1,Spark 2.2.1中构建一个Kafka摄取模块.我的目的是使用结构化流来消费Kafka主题,进行一些处理,并以镶木地板格式存储到EMRFS/S3.
控制台接收器按预期工作,文件接收器不起作用.
在spark-shell:
val event = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()
val eventdf = event.select($"value" cast "string" as "json")
.select(from_json($"json", readSchema) as "data")
.select("data.*")
val outputdf = <some processing on eventdf>
Run Code Online (Sandbox Code Playgroud)
这有效:
val console_query = outputdf.writeStream.format("console")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start
Run Code Online (Sandbox Code Playgroud)
这不是:
val filesink_query = outputdf.writeStream
.partitionBy(<some column>)
.format("parquet")
.option("path", <some path in EMRFS>)
.option("checkpointLocation", "/tmp/ingestcheckpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start //fails
Run Code Online (Sandbox Code Playgroud)
我试过的东西不起作用:
一些挖掘源代码的人把我带到了这里:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog. scala ,它表示缺少.compact文件应触发默认值.
因此尝试:spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay",60000)以确保在新批处理创建组合元数据文件之前未删除旧批处理元数据
使这个错误烦人的原因是它并不总是可重现的.在不更改代码中的单个字符的情况下,写入镶木地板有时会起作用,或者不起作用.我已经尝试清理检查点位置,spark/hdfs日志等,以防火花内部的"状态"导致此问题.
这是错误堆栈跟踪:
query: …Run Code Online (Sandbox Code Playgroud) amazon-s3 amazon-emr apache-spark spark-structured-streaming
我阅读了AWS文档,使用Hadoop SPark和Livy创建了EMR集群高版本。选择笔记本,创建一个,将其连接到集群,但启动时突然停止,并显示消息Cluster ########### 没有安装 JupyterEnterpriseGateway 应用程序。请使用另一个集群重试。 我尝试通过 SSH 连接到主节点并 pip 安装它,但失败了。网上也找不到解决办法。我认为这些集群是开箱即用的。有什么解决办法吗?
我正在创建一个解析大量服务器数据的工作,然后将其上传到Redshift数据库中.
我的工作流程如下:
dataframes或spark sql来解析数据并写回S3我已经开始讨论如何自动执行此操作,以便我的进程旋转EMR集群,引导正确的安装程序,并运行我的python脚本,其中包含用于解析和编写的代码.
有没有人有任何可以与我分享的示例,教程或经验,以帮助我学习如何做到这一点?
我有一个EMR Spark Job需要在一个帐户上从S3读取数据并写入另一个帐户.
我把工作分成两步.
从S3读取数据(因为我的EMR集群在同一帐户中,所以不需要凭据).
读取步骤1创建的本地HDFS中的数据,并将其写入另一个帐户的S3存储桶.
我试过设置hadoopConfiguration:
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your access key>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","<your secretkey>")
Run Code Online (Sandbox Code Playgroud)
并导出群集上的密钥:
$ export AWS_SECRET_ACCESS_KEY=
$ export AWS_ACCESS_KEY_ID=
Run Code Online (Sandbox Code Playgroud)
我已经尝试了群集和客户端模式以及spark-shell而没有运气.
他们每个都返回一个错误:
ERROR ApplicationMaster: User class threw exception: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Access Denied
Run Code Online (Sandbox Code Playgroud) 使用MapReduce和HDFS的数据位置非常重要(同样适用于Spark,HBase).在云中部署集群时,我一直在研究AWS以及两个选项:
第二种选择似乎更有吸引力,原因各不相同,其中最有趣的是能够分别扩展存储和处理以及在不需要时关闭处理(更正确,仅在需要时打开它).这是一个解释使用S3的优点的示例.
让我烦恼的是数据局部性的问题.如果数据存储在S3中,则每次运行作业时都需要将其拉到HDFS.我的问题是 - 这个问题有多大,它还值得吗?
令我感到安慰的是,我将在第一次提取数据,然后所有下一个工作将在本地获得中间结果.
我正在寻找一些有实际经验的人的答案.谢谢.
amazon-emr ×10
amazon-s3 ×5
apache-spark ×5
emr ×2
amazon-ec2 ×1
aws-cli ×1
hadoop ×1
java ×1
jvm ×1
pyspark ×1
python ×1
scala ×1