小编V. *_*mma的帖子

使用单个标头合并Spark输出CSV文件

我想在AWS中创建一个数据处理管道,最终将处理后的数据用于机器学习.

我有一个Scala脚本,它从S3获取原始数据,处理它并使用Spark-CSV将其写入HDFS甚至S3 .如果我想使用AWS Machine Learning工具来训练预测模型,我想我可以使用多个文件作为输入.但是如果我想使用别的东西,我认为最好是收到一个CSV输出文件.

目前,因为我不希望使用的重新分配(1) ,也不合并(1)用于提高性能的目的,我已经使用了Hadoop的FS -getmerge手动测试,但它只是合并作业输出文件的内容,我遇到了一个小问题.我需要在数据文件中使用单行标题来训练预测模型.

如果我使用.option("header","true")spark-csv,那么它会将标头写入每个输出文件,并且在合并之后我在数据中有与输出文件一样多的标题行.但是如果header选项为false,则它不会添加任何标头.

现在我找到了一个选项,可以将Scala脚本中的文件与Hadoop API合并FileUtil.copyMerge.我尝试spark-shell使用下面的代码.

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
Run Code Online (Sandbox Code Playgroud)

但是这个解决方案仍然只是将文件连接在一起,并且不处理头文件.如何获得只有一行标题的输出文件?

我甚至尝试添加df.columns.mkString(",")作为最后一个参数copyMerge,但这仍然多次添加标题,而不是一次.

csv hadoop scala apache-spark

21
推荐指数
2
解决办法
8319
查看次数

按日期从Spark中读取S3中的多个文件

描述

我有一个应用程序,它将数据发送到AWS Kinesis Firehose,并将数据写入我的S3存储桶.Firehose使用"yyyy/MM/dd/HH"格式来编写文件.

就像在这个示例S3路径中一样:

s3://mybucket/2016/07/29/12
Run Code Online (Sandbox Code Playgroud)

现在我有一个用Scala编写的Spark应用程序,我需要从特定时间段读取数据.我有开始和结束日期.数据采用JSON格式,这就是我sqlContext.read.json()不使用的原因sc.textFile().

如何快速有效地读取数据?

我试过了什么?

  1. 通配符 - 我可以选择特定日期或特定月份所有日期的所有小时数据,例如:

    val df = sqlContext.read.json("s3://mybucket/2016/07/29/*")
    val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")
    
    Run Code Online (Sandbox Code Playgroud)

    但是,如果我必须从几天的日期读取数据,例如2016-07-29 - 2016-07-30我不能以相同的方式使用通配符方法.

    这让我想到了下一点......

  2. 解决方案中使用samthebest提供的多个路径或CSV目录.似乎用逗号分隔目录只能使用和不使用.sc.textFile()sqlContext.read.json()
  3. 联盟 - 的前一个链接的第二个解决方案建议分别读取每个目录,然后将它们合并在一起.虽然他建议联合RDD-s,但也可以选择联合DataFrames.如果我手动生成给定日期时间段的日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取失败.相反,我可以使用AWS SDK并使用listObjectsAmazonS3Client中的函数来获取上一个链接中iMKanchwala解决方案中的所有密钥.

    唯一的问题是我的数据不断变化.如果read.json()函数将所有数据作为单个参数获取,它将读取所有必需的数据,并且足够智能从数据中推断出json模式.如果我分别读取2个目录并且它们的模式不匹配,那么我认为联合这两个数据帧会成为一个问题.

  4. Glob(?)语法 - nhahtdh的这个解决方案比选项12好一点,因为它们提供了更详细地指定日期和目录的选项,并作为单个"路径",因此它也适用于.read.json()

    但同样,关于丢失的目录会出现一个熟悉的问题.假设我想要从20.07到30.07的所有数据,我可以这样声明:

    val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")
    
    Run Code Online (Sandbox Code Playgroud)

    但是,如果我从7月25日开始丢失数据,那么路径..16/07/25/就不存在了,整个功能都失败了.

显然,当请求的时间段是25.11.2015-12.02.2016时,它会变得更加困难,那么我需要以编程方式(在我的Scala脚本中)创建一个类似这样的字符串路径:

"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"
Run Code Online (Sandbox Code Playgroud)

通过创建它,我会以某种方式确定这些25-30和01-12间隔都有相应的路径,如果缺少一个,它会再次失败.(幸运的是,Asterisk会处理丢失的目录,因为它会读取存在的所有内容)

如何从一个目录路径中一次性读取所有必要的数据,而不会因为某个日期间隔之间缺少目录而失败?

scala amazon-s3 apache-spark apache-spark-sql aws-sdk

21
推荐指数
1
解决办法
1万
查看次数

如何在Apache Spark中处理更改镶木地板模式

我遇到了一个问题,我将Parquet数据作为S3中的每日块(以形式s3://bucketName/prefix/YYYY/MM/DD/)但我无法从不同的日期读取AWS EMR Spark中的数据,因为某些列类型不匹配,我得到许多异常中的一个,例如:

java.lang.ClassCastException: optional binary element (UTF8) is not a group
Run Code Online (Sandbox Code Playgroud)

在某些文件中出现的数组类型具有值,但同一列可能null在其他文件中具有值,然后将其推断为String类型.

要么

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)
Run Code Online (Sandbox Code Playgroud)

我在S3中以JSON格式存在原始数据,我最初的计划是创建一个自动作业,启动一个EMR集群,读取前一个日期的JSON数据,然后将其作为镶木地板写回S3.

JSON数据也分为日期,即键具有日期前缀.阅读JSON工作正常.无论当前正在读取多少数据,都可以从数据中推断出模式.

但是当编写镶木地板文件时问题就会出现.据我所知,当我使用元数据文件编写镶木地板时,这些文件包含镶木地板文件的所有零件/分区的模式.对我而言,似乎也可以使用不同的模式.当我禁用写入元数据时,据说Spark从给定Parquet路径中的第一个文件推断整个模式,并假设它通过其他文件保持不变.

当一些应该是double类型的列只有给定日期的整数值时,从JSON读取它们(它们将这些数字作为整数,没有浮点数)使得Spark认为它是一个具有类型的列long.即使我可以在编写Parquet文件之前将这些列转换为double,但这仍然不好,因为架构可能会更改,可以添加新列,并且无法跟踪此列.

我看到有些人有同样的问题,但我还没有找到一个足够好的解决方案.

有什么最佳实践或解决方案?

emr apache-spark parquet apache-spark-sql spark-dataframe

19
推荐指数
2
解决办法
1万
查看次数

Java Spring:通过REST API实时更新客户端状态

我正在Java Spring中开发一个Web应用程序,我希望用户能够从前端上载CSV文件,然后查看导入过程的实时进度,并且在导入之后,他应该能够搜索单个条目从导入的数据。

导入过程将包括实际上传文件(通过REST API POST请求发送),然后读取文件并将其内容保存到数据库中,以便用户能够从该数据中进行搜索。

如何显示此过程的实时进度?我找到了一个jQuery教程,其中显示了上载/传输的数据量的进度,但是由于大多数工作是在处理上载的文件时完成的,所以我想我想要一个解决方案,在行处理之前先找出数量文件中的行数,然后用户可以看到如下实时消息:

处理的行数:10000中的1

它可以进行增量更新/更改,但是由于一行处理得非常快,因此显示处理的每一行并不那么重要。

无论哪种方式,问题都是,从Spring REST API向客户端发送这些消息的最简单方法是什么?

java rest spring spring-mvc spring-boot

8
推荐指数
1
解决办法
3440
查看次数

如何在ASP.NET(Core 2)Web应用程序中访问AWS的认证?

我正在尝试将ASP.NET Core 2 Web应用程序部署到AWS Elastic Beanstalk.

该应用程序实际上是IdentityServer4,我需要访问该认证才能签署和验证令牌.

有一个教程如何配置Azure的Web应用程序使用证书的位置,但我还没有发现AWS类似的事情.

无论我搜索AWS及其证书,我总能找到有关HTTPS的SSL/TLS连接的文章和文档.我知道如何做到并且将单独执行此操作,我已经在AWS Certificate Manager中提供了证书,我可以在Elastic Beanstalk中为Load Balancer设置它,但ACM文档声明:

  • ACM不提供SSL/TLS协议以外的任何证书.
  • 您不能将ACM证书用于代码签名或电子邮件加密.

因此,如果我想在我的代码中使用证书,那么ACM Cert似乎并不适用于此.

我可以使用OpenSSL创建自签名证书,但我不知道从Elastic Beanstalk实例中的ASP.NET Core 2 Web应用程序访问它的最佳方法是什么.我不能把证书文件放在我的代码库中,我想以某种方式通过AWS将它注入环境但我不知道在我的应用程序中何处或如何访问它?

certificate amazon-web-services asp.net-core aws-certificate-manager identityserver4

8
推荐指数
1
解决办法
492
查看次数

如何在 ASP.NET Core 2.0 和 EF Core 2.0 中将应用程序设置从项目根获取到 IDesignTimeDbContextFactory 实现

我正在 ASP.NET Core 2.0 中构建应用程序,但在 EntityFramework 迁移方面遇到问题。

我的 DbContext 位于一个单独的项目 ( SolutionName \ ProjectNamePrefix .Data) 中,因此我创建了 IDesignTimeDbContextFactory 接口的实现。

我想为不同的环境使用不同的连接字符串,并且我需appsettings.json要这样做。

因此,经过快速搜索后,我发现我可以在函数IConfigurationRoot内创建一个新对象CreateDbContext,如下所示: https: //codingblast.com/entityframework-core-idesigntimedbcontextfactory/

dotnet ef migrations list -c MyContext我添加了它,然后为了测试,尝试从数据项目根文件夹运行。

然后我收到以下错误:

The configuration file 'appsettings.json' was not found and is not optional. The physical path is 'C:\dev\*SolutionName*\*ProjectNamePrefix*.Data\bin\Debug\netcoreapp2.0\appsettings.json'.
Run Code Online (Sandbox Code Playgroud)

所以,基本上,我尝试了 3 个选项来获取正确的根路径:

  • Directory.GetCurrentDirectory();
  • env.ContentRootPath;IHostingEnvironment对象,我找到了一种方法来获取它: https: //github.com/aspnet/Home/issues/2194
  • AppDomain.CurrentDomain.BaseDirectory;

他们都返回同一..\bin\debug\netcoreapp2.0\条路。当我从 VS 运行数据项目时,前两个选项为我提供了正确的项目根文件夹。

有没有办法获取正确的项目内容根文件夹?

因为当我向 EF 命令添加 --verbose 时,它​​会注销一行:

Using content root 'C:\dev\FitsMeIdentity\FitsMeIdentity.Data\'.
Run Code Online (Sandbox Code Playgroud)

所以我知道 …

asp.net entity-framework entity-framework-core asp.net-core-2.0 ef-core-2.0

6
推荐指数
2
解决办法
4994
查看次数

使用S3DistCp将文件从S3复制到EMR

我正在努力寻找一种在我的AWS EMR集群中使用S3DistCp的方法。

一些旧示例显示了如何将s3distcp添加为EMR步骤use elastic-mapreduce命令,该命令已不再使用。

其他一些消息来源建议使用s3-dist-cp命令,该命令在当前EMR群集中找不到。甚至官方文档(在线和EMR开发人员指南2016 pdf)也提供了这样的示例:

aws emr add-steps --cluster-id j-3GYXXXXXX9IOK --steps Type=CUSTOM_JAR,Name="S3DistCp step",Jar=/home/hadoop/lib/emr-s3distcp-1.0.jar,Args=["--s3Endpoint,s3-eu-west-1.amazonaws.com","--src,s3://mybucket/logs/j-3GYXXXXXX9IOJ/node/","--dest,hdfs:///output","--srcPattern,.*[azA-Z,]+"]
Run Code Online (Sandbox Code Playgroud)

但是路径中没有lib文件夹/home/hadoop。我在此文件夹中找到了一些hadoop库:/usr/lib/hadoop/lib,但找不到s3distcp任何地方。

然后我发现在某些S3存储桶中有一些可用的库。例如,从这个问题中,我找到了以下路径:s3://us-east-1.elasticmapreduce/libs/s3distcp/1.latest/s3distcp.jar。这似乎是朝着正确方向迈出的一步,因为使用这些参数从AWS界面向正在运行的EMR集群添加了一个新步骤,从而开始了该步骤(之前没有尝试过该步骤),但在约15秒后失败了:

JAR location: s3://us-east-1.elasticmapreduce/libs/s3distcp/1.latest/s3distcp.jar
Main class: None
Arguments: --s3Endpoint s3-eu-west-1.amazonaws.com --src s3://source-bucket/scripts/ --dest hdfs:///output
Action on failure: Continue
Run Code Online (Sandbox Code Playgroud)

这导致以下错误:

Exception in thread "main" java.lang.RuntimeException: Unable to retrieve Hadoop configuration for key fs.s3n.awsAccessKeyId
    at com.amazon.external.elasticmapreduce.s3distcp.ConfigurationCredentials.getConfigOrThrow(ConfigurationCredentials.java:29)
    at com.amazon.external.elasticmapreduce.s3distcp.ConfigurationCredentials.<init>(ConfigurationCredentials.java:35)
    at com.amazon.external.elasticmapreduce.s3distcp.S3DistCp.createInputFileListS3(S3DistCp.java:85)
    at com.amazon.external.elasticmapreduce.s3distcp.S3DistCp.createInputFileList(S3DistCp.java:60)
    at com.amazon.external.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:529)
    at com.amazon.external.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:216)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84) …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-emr elastic-map-reduce aws-sdk s3distcp

5
推荐指数
1
解决办法
9214
查看次数

使用AWS Lambda连接S3中的文件

有没有办法使用Lambda进行S3文件串联?

我有Firehose以最长的间隔(15分钟或128mb)将数据流式传输到S3,因此我每天有96个数据文件,但是我想将所有数据聚合到一个日常数据文件中,以便在以后读取数据时获得最快的性能。在Spark(EMR)中。

我创建了一个解决方案,当Firehose将新文件流式传输到S3时,将调用Lambda函数。然后,该函数从源存储桶中读取(s3.GetObject)新文件,并从目标存储桶中读取已连接的每日数据文件(如果以前的每日数据已经存在,则创建一个新文件),将两个响应体都解码为字符串,然后只需将它们加在一起,然后使用s3.PutObject(覆盖先前的聚合文件)写入目标存储桶即可。

问题在于,当聚合文件达到150+ MB时,Lambda函数在读取两个文件时会达到其〜1500mb的内存限制,然后失败。

目前,我的数据量很少,每天只有几百MB-s,但是将来这个数字将成倍增长。对我来说,Lambda的限制如此之低,以至于文件太小它们已经达到了。

或者,最好是由S3对象创建的事件或某种方式调度的作业(例如每天调度的)调用串联S3数据的替代方法?

concatenation amazon-s3 aws-sdk aws-lambda amazon-kinesis-firehose

4
推荐指数
2
解决办法
5776
查看次数

如何将Spark Dataframe列转换为字符串数组的单个列

我想知道如何将多个数据帧列"合并"为一个字符串数组?

例如,我有这个数据帧:

val df = sqlContext.createDataFrame(Seq((1, "Jack", "125", "Text"), (2,"Mary", "152", "Text2"))).toDF("Id", "Name", "Number", "Comment")
Run Code Online (Sandbox Code Playgroud)

看起来像这样:

scala> df.show
+---+----+------+-------+
| Id|Name|Number|Comment|
+---+----+------+-------+
|  1|Jack|   125|   Text|
|  2|Mary|   152|  Text2|
+---+----+------+-------+

scala> df.printSchema
root
 |-- Id: integer (nullable = false)
 |-- Name: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Comment: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我怎样才能改变它,看起来像这样:

scala> df.show
+---+-----------------+
| Id|             List|
+---+-----------------+
|  1|  [Jack,125,Text]|
|  2| [Mary,152,Text2]|
+---+-----------------+

scala> df.printSchema
root
 |-- Id: integer (nullable …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

3
推荐指数
1
解决办法
4396
查看次数

Java Spring:如何高效地从CSV文件中读取和保存大量数据?

我正在 Java Spring 中开发一个 Web 应用程序,我希望用户能够从前端上传 CSV 文件,然后查看导入过程的实时进度,导入后他应该能够搜索单个条目从导入的数据。

导入过程包括实际上传文件(通过 REST API POST 请求发送),然后读取文件并将其内容保存到数据库中,以便用户能够从此数据中进行搜索。

将数据保存到数据库的最快方法是什么?仅循环遍历各行并创建一个新的类对象并通过 JPARepository 为每行保存它会花费太多时间。10000 行大约需要 90 秒。我需要让它变得更快。我需要在合理的时间内添加 200k 行。

附注:

我看到了使用 Reactor 的异步方法。这应该会更快,因为它使用多个线程,并且保存行的顺序基本上并不重要(尽管数据在 CSV 中具有 ID)。

然后我还看到了 Spring Batch 作业,但所有示例都使用 SQL。我正在使用存储库,所以我不确定是否可以使用它或者它是否是最好的方法。

java spring spring-mvc spring-data-jpa spring-boot

3
推荐指数
1
解决办法
2万
查看次数