标签: spark-dataframe

从分区的镶木地板文件中读取DataFrame

如何读取条件为数据帧的分区镶木地板,

这工作正常,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=25/*")
Run Code Online (Sandbox Code Playgroud)

分区是有day=1 to day=30是它可以读取类似(day = 5 to 6)或者day=5,day=6,

val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=??/*")
Run Code Online (Sandbox Code Playgroud)

如果我把*它给我所有30天的数据,它太大了.

scala apache-spark parquet spark-dataframe

22
推荐指数
3
解决办法
4万
查看次数

如何通过pandas或spark数据帧删除所有行中具有相同值的列?

假设我的数据类似于以下内容:

  index id   name  value  value2  value3  data1  val5
    0  345  name1    1      99      23     3      66
    1   12  name2    1      99      23     2      66
    5    2  name6    1      99      23     7      66
Run Code Online (Sandbox Code Playgroud)

我们如何使用python在一个命令或几个命令中删除所有列(如value,value2(value3),所有行具有相同值的列)?

考虑到我们有类似的许多列value,value2,value3... value200.

输出:

   index id      name   data1
        0  345  name1    3
        1   12  name2    2
        5    2  name6    7
Run Code Online (Sandbox Code Playgroud)

python duplicates multiple-columns pandas spark-dataframe

22
推荐指数
2
解决办法
1万
查看次数

为什么Apache Spark会在嵌套结构中读取不必要的Parquet列?

我的团队正在构建一个ETL过程,使用Spark将原始分隔文本文件加载到基于Parquet的"数据湖"中.Parquet列存储的一个承诺是查询只会读取必要的"列条带".

但是我们看到嵌套模式结构正在读取意外的列.

为了演示,这里有一个使用Scala和Spark 2.0.1 shell的POC:

// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// Create a schema with nested complex structures
val schema = StructType(Seq(
    StructField("F1", IntegerType),
    StructField("F2", IntegerType),
    StructField("Orig", StructType(Seq(
        StructField("F1", StringType),
        StructField("F2", StringType))))))

// Create some sample data
val data = spark.createDataFrame(
    sc.parallelize(Seq(
        Row(1, 2, Row("1", "2")),
        Row(3, null, Row("3", "ABC")))),
    schema)

// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")
Run Code Online (Sandbox Code Playgroud)

然后我们将文件读回DataFrame并投影到列的子集:

// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")

// Select & show a subset of the columns
df.select($"F1", …
Run Code Online (Sandbox Code Playgroud)

apache-spark parquet spark-dataframe

22
推荐指数
2
解决办法
3055
查看次数

如何解决Spark中的AnalysisException:resolved属性

val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")
Run Code Online (Sandbox Code Playgroud)

连接操作工作正常,但是当我重用df2时,我面临未解决的属性错误

val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")
Run Code Online (Sandbox Code Playgroud)

错误:org.apache.spark.sql.AnalysisException:已解析的属性ID#426

java scala spark-dataframe

22
推荐指数
4
解决办法
3万
查看次数

更改spark数据帧中列的可空属性

我正在为某些测试手动创建数据帧.创建它的代码是:

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))
Run Code Online (Sandbox Code Playgroud)

架构看起来像这样:

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)
Run Code Online (Sandbox Code Playgroud)

我想为这些变量中的每一个制作'nullable = true'.如何从一开始就声明它或在创建新数据帧后将其切换?

scala apache-spark spark-dataframe

21
推荐指数
4
解决办法
2万
查看次数

Pyspark:显示数据框列的直方图

在pandas数据框中,我使用以下代码绘制列的直方图:

my_df.hist(column = 'field_1')
Run Code Online (Sandbox Code Playgroud)

在pyspark数据框架中是否有可以实现相同目标的东西?(我在Jupyter笔记本中)谢谢!

python pyspark spark-dataframe jupyter-notebook

21
推荐指数
3
解决办法
5万
查看次数

在Spark数据帧中爆炸嵌套的Struct

我正在研究Databricks 示例.数据框架的架构如下所示:

> parquetDF.printSchema
root
|-- department: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- name: string (nullable = true)
|-- employees: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- firstName: string (nullable = true)
|    |    |-- lastName: string (nullable = true)
|    |    |-- email: string (nullable = true)
|    |    |-- salary: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

在该示例中,他们将展示如何将employees列分解为4个附加列:

val explodeDF = parquetDF.explode($"employees") { 
case Row(employee: Seq[Row]) …
Run Code Online (Sandbox Code Playgroud)

scala distributed-computing apache-spark spark-dataframe databricks

21
推荐指数
2
解决办法
3万
查看次数

退出代码和退出状态是否意味着什么火花?

在纱线上运行火花时,我一直看到退出代码和退出状态:

以下是一些:

  • CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM

  • ...failed 2 times due to AM Container for application_1431523563856_0001_000002 exited with exitCode: 10...

  • ...Exit status: 143. Diagnostics: Container killed on request

  • ...Container exited with a non-zero exit code 52:...

  • ...Container killed on request. Exit code is 137...

我从来没有发现任何这些消息是有用的......有没有机会解释这些消息究竟出了什么问题?我搜索了高低不一的表格来解释错误,但没有.

我能够从上面解释的唯一一个是退出代码52,但那是因为我在这里查看了源代码.这是说这是一个OOM.

我是否应该停止尝试解释其余的退出代码并退出状态?或者我错过了一些明显的方式,这些数字实际意味着什么?

即使有人能告诉我之间的差异exit code,exit status以及SIGNAL这将是有益的.但我现在只是随机猜测,而且我周围的其他所有人都使用了火花.

最后,为什么一些退出代码小于零以及如何解释这些?

例如 Exit status: -100. Diagnostics: Container released on a *lost* node

hadoop hadoop-yarn apache-spark pyspark spark-dataframe

21
推荐指数
1
解决办法
1万
查看次数

如何在pyspark中的groupBy之后计算唯一ID

我每年使用以下代码来聚集学生.目的是了解每年的学生总数.

from pyspark.sql.functions import col
import pyspark.sql.functions as fn
gr = Df2.groupby(['Year'])
df_grouped = 
gr.agg(fn.count(col('Student_ID')).alias('total_student_by_year'))
Run Code Online (Sandbox Code Playgroud)

结果是:

[学生按年份] [1]

我发现有这么多ID重复的问题所以结果是错误的和巨大的.

我希望按年份对学生进行聚集,按年计算学生总数,并将ID重复计算.

我希望这个问题很清楚.我是新成员谢谢

python pyspark spark-dataframe pyspark-sql

21
推荐指数
2
解决办法
4万
查看次数

如何避免在保存DataFrame时生成crc文件和SUCCESS文件?

我使用以下代码将spark DataFrame保存到JSON文件

unzipJSON.write.mode("append").json("/home/eranw/Workspace/JSON/output/unCompressedJson.json")
Run Code Online (Sandbox Code Playgroud)

输出结果是:

part-r-00000-704b5725-15ea-4705-b347-285a4b0e7fd8
.part-r-00000-704b5725-15ea-4705-b347-285a4b0e7fd8.crc
part-r-00001-704b5725-15ea-4705-b347-285a4b0e7fd8
.part-r-00001-704b5725-15ea-4705-b347-285a4b0e7fd8.crc
_SUCCESS
._SUCCESS.crc
Run Code Online (Sandbox Code Playgroud)
  1. 如何生成单个JSON文件而不是每行文件?
  2. 如何避免*crc文件?
  3. 如何避免SUCCESS文件?

json apache-spark spark-dataframe

20
推荐指数
3
解决办法
1万
查看次数