标签: spark-dataframe

如何在Java中的Apache Spark中将DataFrame转换为Dataset?

我可以很容易地将Scala中的DataFrame转换为Dataset:

case class Person(name:String, age:Long)
val df = ctx.read.json("/tmp/persons.json")
val ds = df.as[Person]
ds.printSchema
Run Code Online (Sandbox Code Playgroud)

但在Java版本中我不知道如何将Dataframe转换为Dataset?任何的想法?

我的努力是:

DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = new Encoder<>();
Dataset<Person> ds = new Dataset<Person>(ctx,df.logicalPlan(),encoder);
ds.printSchema();
Run Code Online (Sandbox Code Playgroud)

但是编译器说:

Error:(23, 27) java: org.apache.spark.sql.Encoder is abstract; cannot be instantiated
Run Code Online (Sandbox Code Playgroud)

编辑(解决方案):

基于@Leet-Falcon答案的解决方案:

DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = Encoders.bean(Person.class);
Dataset<Person> ds = new Dataset<Person>(ctx, df.logicalPlan(), encoder);
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-dataframe apache-spark-dataset

15
推荐指数
2
解决办法
2万
查看次数

为什么Spark作业失败并显示"退出代码:52"

我有一个像这样的跟踪失败的Spark工作:

./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-Container id: container_1455622885057_0016_01_000008
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-Exit code: 52
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr:Stack trace: ExitCodeException exitCode=52: 
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at org.apache.hadoop.util.Shell.run(Shell.java:456)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-      at java.lang.Thread.run(Thread.java:745)
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-
./containers/application_1455622885057_0016/container_1455622885057_0016_01_000001/stderr-Container exited with a non-zero exit code 52
Run Code Online (Sandbox Code Playgroud)

我花了一段时间才弄清楚"退出代码52"是什么意思,所以我把它放在这里是为了其他可能正在搜索的人的利益

hadoop-yarn apache-spark spark-dataframe

15
推荐指数
1
解决办法
1万
查看次数

阻止DataFrame.partitionBy()从架构中删除分区列

我正在按如下方式对DataFrame进行分区:

df.write.partitionBy("type", "category").parquet(config.outpath)
Run Code Online (Sandbox Code Playgroud)

代码给出了预期的结果(即按类型和类别划分的数据).但是,"type"和"category"列将从数据/模式中删除.有没有办法防止这种行为?

apache-spark spark-dataframe

15
推荐指数
3
解决办法
6858
查看次数

如何在Scala和Apache Spark中连接两个DataFrame?

有两个DataFrame(Scala,Apache Spark 1.6.1)

1)比赛

         MatchID | Player1    |  Player2 
         --------------------------------
               1 | John Wayne | John Doe
               2 | Ive Fish   | San Simon
Run Code Online (Sandbox Code Playgroud)

2)个人资料

              Player     |  BirthYear 
              --------------------------------
              John Wayne | 1986
              Ive Fish   | 1990
              San Simon  | 1974
              john Doe   | 1995
Run Code Online (Sandbox Code Playgroud)

如何为两个玩家创建一个带有'BirthYear'的新DataFrame

         MatchID | Player1    |  Player2  | BYear_P1 |BYear_P2 | Diff
         -------------------------------------------------------------
               1 | John Wayne | John Doe  |   1986   | 1995    |  9  
               2 | Ive Fish   | San Simon |   1990   | …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

15
推荐指数
2
解决办法
5万
查看次数

从Apache Spark中的模式获取数据类型列表

我在Spark-Python中有以下代码来获取DataFrame架构中的名称列表,它工作正常,但我如何获取数据类型列表?

columnNames = df.schema.names
Run Code Online (Sandbox Code Playgroud)

例如,类似于:

columnTypes = df.schema.types
Run Code Online (Sandbox Code Playgroud)

有没有办法获得DataFrame模式中包含的单独的数据类型列表?

python schema types apache-spark spark-dataframe

15
推荐指数
2
解决办法
3万
查看次数

Spark 2.0,DataFrame,过滤字符串列,不等运算符(!==)已弃用

我试图通过仅保留那些具有某个字符串列非空的行来过滤DataFrame.

操作如下:

df.filter($"stringColumn" !== "")
Run Code Online (Sandbox Code Playgroud)

我的编译器显示,自从我转到Spark 2.0.1后,不推荐使用!==

如何在Spark> 2.0中检查字符串列值是否为空?

apache-spark spark-dataframe

15
推荐指数
1
解决办法
2万
查看次数

在DataFrameWriter上使用partitionBy会使用列名而不仅仅是值来编写目录布局

我正在使用Spark 2.0.

我有一个DataFrame.我的代码如下所示:

df.write.partitionBy("year", "month", "day").format("csv").option("header", "true").save(s"s3://bucket/")
Run Code Online (Sandbox Code Playgroud)

当程序执行时,它以下列格式写入文件:

s3://bucket/year=2016/month=11/day=15/file.csv
Run Code Online (Sandbox Code Playgroud)

如何配置格式如下:

s3://bucket/2016/11/15/file.csv
Run Code Online (Sandbox Code Playgroud)

我还想知道是否可以配置文件名.

这里的相关文档看起来很稀疏......
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

partitionBy(colNames: String*): DataFrameWriter[T]
Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

year=2016/month=01/
year=2016/month=02/
Partitioning is one of the most widely used techniques to optimize physical data layout. It …
Run Code Online (Sandbox Code Playgroud)

configuration scala apache-spark spark-dataframe

15
推荐指数
1
解决办法
4664
查看次数

从Pyspark Dataframe中提取numpy数组

我有一个数据帧gi_man_df,其中group可以是n:

+------------------+-----------------+--------+--------------+
|           group  |           number|rand_int|   rand_double|
+------------------+-----------------+--------+--------------+
|          'GI_MAN'|                7|       3|         124.2|
|          'GI_MAN'|                7|      10|        121.15|
|          'GI_MAN'|                7|      11|         129.0|
|          'GI_MAN'|                7|      12|         125.0|
|          'GI_MAN'|                7|      13|         125.0|
|          'GI_MAN'|                7|      21|         127.0|
|          'GI_MAN'|                7|      22|         126.0|
+------------------+-----------------+--------+--------------+
Run Code Online (Sandbox Code Playgroud)

我期待一个numpy nd_array,即gi_man_array:

[[[124.2],[121.15],[129.0],[125.0],[125.0],[127.0],[126.0]]]
Run Code Online (Sandbox Code Playgroud)

应用pivot后rand_double的值.

我尝试了以下两种方法:首先
:我按如下方式转动gi_man_df:

gi_man_pivot = gi_man_df.groupBy("number").pivot('rand_int').sum("rand_double")
Run Code Online (Sandbox Code Playgroud)

我得到的输出是:

Row(number=7, group=u'GI_MAN', 3=124.2, 10=121.15, 11=129.0, 12=125.0, 13=125.0, 21=127.0, 23=126.0)
Run Code Online (Sandbox Code Playgroud)

但这里的问题是获得所需的输出,我无法将其转换为矩阵然后再转换为numpy数组.

SECOND: 我使用以下方法在数据框中创建了向量:

assembler = VectorAssembler(inputCols=["rand_double"],outputCol="rand_double_vector")

gi_man_vector = assembler.transform(gi_man_df)
gi_man_vector.show(7) …
Run Code Online (Sandbox Code Playgroud)

numpy apache-spark pyspark spark-dataframe apache-spark-mllib

15
推荐指数
1
解决办法
2万
查看次数

将函数应用于Spark中csv的单个列

使用Spark我正在读取csv并希望将函数应用于csv上的列.我有一些有用的代码,但它非常hacky.这样做的正确方法是什么?

我的代码

SparkContext().addPyFile("myfile.py")
spark = SparkSession\
    .builder\
    .appName("myApp")\
    .getOrCreate()
from myfile import myFunction

df = spark.read.csv(sys.argv[1], header=True,
    mode="DROPMALFORMED",)
a = df.rdd.map(lambda line: Row(id=line[0], user_id=line[1], message_id=line[2], message=myFunction(line[3]))).toDF()
Run Code Online (Sandbox Code Playgroud)

我希望能够只在列名称上调用函数,而不是将每一行映射到line然后调用函数line[index].

我正在使用Spark版本2.0.1

apache-spark pyspark spark-dataframe

14
推荐指数
1
解决办法
1万
查看次数

如何将整列的大小写改为小写?

我想在Spark数据集中将整列的大小写更改为小写

        Desired Input
        +------+--------------------+
        |ItemID|       Category name|
        +------+--------------------+
        |   ABC|BRUSH & BROOM HAN...|
        |   XYZ|WHEEL BRUSH PARTS...|
        +------+--------------------+

        Desired Output
        +------+--------------------+
        |ItemID|       Category name|
        +------+--------------------+
        |   ABC|brush & broom han...|
        |   XYZ|wheel brush parts...|
        +------+--------------------+
Run Code Online (Sandbox Code Playgroud)

我尝试使用collectAsList()和toString(),这对于非常大的数据集来说是一个缓慢而复杂的过程.

我还发现了一种方法'较低',但没有知道如何让它在dasaset中工作请建议我一个简单或有效的方法来做到这一点.提前致谢

apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

14
推荐指数
2
解决办法
3万
查看次数