如何在 s3 存储桶中创建 Avro 文件,然后将 avro 记录附加到其中。
我拥有字节数组形式的所有 avro 记录,并已成功传输到 avro 文件中。但他的文件(据我所知)不是完整的 avro 文件。由于完整的 avro 文件是 schema + data。
以下是在 S3 中传输文件中的字节记录的代码。
任何人都知道如何创建基于 avro 模式的文件,然后将这些字节传输到同一个文件。
public void sendByteData(byte [] b, Schema schema){
try{
AWSCredentials credentials = new BasicAWSCredentials("XXXXX", "XXXXXX");
AmazonS3 s3Client = new AmazonS3Client(credentials);
//createFolder("encounterdatasample", "avrofiles", s3Client);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(b.length);
InputStream stream = new ByteArrayInputStream(b);
/* File file = new File("/home/abhishek/sample.avro");
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
dataFileWriter.create(schema, file);
s3Client.putObject("encounterdatasample", dataFileWriter.create(schema, file), stream, …Run Code Online (Sandbox Code Playgroud) 我有一个包含 JSON 数据的文本文件,如下所示:
{
"element" : value,
"id" : value,
"total" : []
}
{
"element" : value,
"id" : value,
"total: []
}
Run Code Online (Sandbox Code Playgroud)
所有 JSON 都以新行分隔。
我正在尝试将所有文本文件数据加载到临时视图中:
sqlContext.read.textFiles("/path").createOrReplaceTempView("result")
val data = sqlContext.sql("select * from result").collect()
Run Code Online (Sandbox Code Playgroud)
结果:
[{"element" : value,"id" : value,"total" : [] }]
[{"element" : value,"id" : value, "total" : []}]
Run Code Online (Sandbox Code Playgroud)
我需要提取 id 和与之相关的总数。
有没有办法在火花中处理这个?
我正在尝试创建一个 EMR 集群并执行一个 Spark 作业。
我需要启动一个线程,以便在我的工作完成时让我知道该步骤已完成,因为我的 EMR 集群将启动并运行。
{
AmazonElasticMapReduceClient emr = configureEMRClient();
StepFactory stepFactory = new StepFactory();
StepConfig enableDebugging = new StepConfig()
.withName("Enable Debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(stepFactory.newEnableDebuggingStep());
HadoopJarStepConfig runExampleConfig = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit")
.withArgs("--deploy-mode","cluster")
.withArgs("--master","yarn")
.withArgs("--class", extractorMainClass )
.withArgs("--num-executors","3")
.withArgs("--driver-memory","8g")
.withArgs( resourceExtractorJar )
.withArgs("st")
.withArgs("ap");
StepConfig customExampleStep = new StepConfig()
.withName("Example Step")
.withActionOnFailure("TERMINATE_JOB_FLOW")
.withHadoopJarStep(runExampleConfig);
Application spark = new Application().withName("Spark");
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("Test")
.withReleaseLabel("emr-5.5.0")
.withSteps(enableDebugging, customExampleStep)
.withApplications(spark)
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName("keypair")
.withInstanceCount(2)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType("m3.xlarge")
.withSlaveInstanceType("m3.xlarge"));
RunJobFlowResult …Run Code Online (Sandbox Code Playgroud) 我使用以下代码创建了一个集群:
> StepFactory stepFactory = new StepFactory();
StepConfig enableDebugging = new StepConfig().withName("Enable Debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep());
Application spark = new Application().withName("Spark");
RunJobFlowRequest createClusterParameters = new RunJobFlowRequest().withName("CreateDatamart")
.withReleaseLabel("emr-5.5.0")
.withSteps(enableDebugging)
.withApplications(spark)
.withLogUri("s3://logs/")
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName("keypair")
.withInstanceCount(3)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType("m3.xlarge")
.withSlaveInstanceType("m3.xlarge"));
RunJobFlowResult createCluster = emr.runJobFlow(createClusterParameters);
Run Code Online (Sandbox Code Playgroud)
集群被创建。附加到它的步骤也在运行。但集群在 AWS EMR UI 中不可见。我可以在 EMR 的“事件”选项卡下查看详细信息。由于它位于“事件”选项卡下,因此没有必要在其他区域创建它。(虽然我也检查过)在 EC2 控制台中,我可以看到为 EMR 创建的容器。
在另一种情况下,如果我直接从 UI 创建一个集群,它是可见的。
代码有错误吗?
apache-spark ×3
amazon-emr ×2
java ×2
amazon-s3 ×1
avro ×1
aws-sdk ×1
file ×1
json ×1
scala ×1
text ×1