标签: apache-spark-sql

Spark中的DataFrame,Dataset和RDD之间的区别

我只是想知道Apache Spark中的RDDDataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?

你能把一个转换成另一个吗?

apache-spark rdd apache-spark-sql apache-spark-dataset

228
推荐指数
10
解决办法
10万
查看次数

如何在Spark SQL的DataFrame中更改列类型?

假设我做的事情如下:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  
Run Code Online (Sandbox Code Playgroud)

但我真的想要yearas Int(并且可能会转换其他一些列).

我能想到的最好的是

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

143
推荐指数
12
解决办法
32万
查看次数

如何将rdd对象转换为spark中的dataframe

如何将RDD(org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])转换为Dataframe org.apache.spark.sql.DataFrame.我使用了将数据帧转换为rdd .rdd.处理完之后我想把它放回到数据帧中.我怎样才能做到这一点 ?

scala apache-spark rdd apache-spark-sql

128
推荐指数
7
解决办法
24万
查看次数

如何选择每组的第一行?

我有一个DataFrame生成如下:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value") as "TotalValue")
  .sort($"Hour".asc, $"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)

如您所见,DataFrame按Hour递增顺序排序,然后按TotalValue降序排序.

我想选择每组的顶行,即

  • 来自小时组== 0选择(0,cat26,30.9)
  • 来自小时组== 1选择(1,cat67,28.5)
  • 来自小时组== …

sql scala dataframe apache-spark apache-spark-sql

122
推荐指数
3
解决办法
8万
查看次数

Spark - 将CSV文件加载为DataFrame?

我想在spark中读取CSV并将其转换为DataFrame并将其存储在HDFS中 df.registerTempTable("table_name")

我试过了:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Run Code Online (Sandbox Code Playgroud)

我得到的错误:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Run Code Online (Sandbox Code Playgroud)

在Apache Spark中将CSV文件作为DataFrame加载的正确命令是什么?

hadoop scala hdfs apache-spark apache-spark-sql

120
推荐指数
7
解决办法
31万
查看次数

如何定义DataFrame的分区?

我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames.我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作.

我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00
Run Code Online (Sandbox Code Playgroud)

至少在最初,大多数计算将发生在帐户内的交易之间.所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中.

但我没有看到定义这个的方法.DataFrame类有一个名为"repartition(Int)"的方法,您可以在其中指定要创建的分区数.但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定.

源数据存储在Parquet中.我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过"帐户"列对其数据进行分区.但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,因此这听起来不是一个合理的解决方案.

有没有办法让Spark分区这个DataFrame,以便一个帐户的所有数据都在同一个分区?

scala partitioning dataframe apache-spark apache-spark-sql

119
推荐指数
5
解决办法
14万
查看次数

如何在Spark DataFrame中添加常量列?

我想在a中添加一个DataFrame具有任意值的列(对于每一行都是相同的).我使用时出现错误withColumn如下:

dt.withColumn('new_column', 10).head(5)
Run Code Online (Sandbox Code Playgroud)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'
Run Code Online (Sandbox Code Playgroud)

似乎我可以通过添加和减去其中一个列(因此它们添加到零)然后添加我想要的数字(在这种情况下为10)来欺骗函数按照我想要的方式工作:

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
Run Code Online (Sandbox Code Playgroud)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

116
推荐指数
2
解决办法
12万
查看次数

如何在Spark SQL中按降序排列?

我尝试了df.orderBy("col1").show(10)但它按升序排序.df.sort("col1").show(10)也按降序排序.我查看了stackoverflow,我发现的答案都已过时或提交给RDD.我想在spark中使用原生数据帧.

scala apache-spark apache-spark-sql

110
推荐指数
5
解决办法
20万
查看次数

如何向Spark DataFrame添加新列(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.

我试过以下但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)

那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?

python dataframe apache-spark apache-spark-sql pyspark

110
推荐指数
4
解决办法
21万
查看次数

连接Apache Spark DataFrame中的列

我们如何在Apache Spark DataFrame中连接两列?我们可以使用Spark SQL中的任何函数吗?

sql dataframe apache-spark apache-spark-sql

95
推荐指数
8
解决办法
20万
查看次数