我只是想知道Apache Spark中的RDD和DataFrame (Spark 2.0.0 DataFrame只是一个类型别名Dataset[Row])有什么区别?
你能把一个转换成另一个吗?
假设我做的事情如下:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()
root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
year make model comment blank
2012 Tesla S No comment
1997 Ford E350 Go get one now th...
Run Code Online (Sandbox Code Playgroud)
但我真的想要yearas Int(并且可能会转换其他一些列).
我能想到的最好的是
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, …Run Code Online (Sandbox Code Playgroud) 如何将RDD(org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])转换为Dataframe org.apache.spark.sql.DataFrame.我使用了将数据帧转换为rdd .rdd.处理完之后我想把它放回到数据帧中.我怎样才能做到这一点 ?
我有一个DataFrame生成如下:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)
如您所见,DataFrame按Hour递增顺序排序,然后按TotalValue降序排序.
我想选择每组的顶行,即
我想在spark中读取CSV并将其转换为DataFrame并将其存储在HDFS中 df.registerTempTable("table_name")
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Run Code Online (Sandbox Code Playgroud)
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Run Code Online (Sandbox Code Playgroud)
在Apache Spark中将CSV文件作为DataFrame加载的正确命令是什么?
我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames.我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作.
我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Run Code Online (Sandbox Code Playgroud)
至少在最初,大多数计算将发生在帐户内的交易之间.所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中.
但我没有看到定义这个的方法.DataFrame类有一个名为"repartition(Int)"的方法,您可以在其中指定要创建的分区数.但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定.
源数据存储在Parquet中.我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过"帐户"列对其数据进行分区.但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,因此这听起来不是一个合理的解决方案.
有没有办法让Spark分区这个DataFrame,以便一个帐户的所有数据都在同一个分区?
我想在a中添加一个DataFrame具有任意值的列(对于每一行都是相同的).我使用时出现错误withColumn如下:
dt.withColumn('new_column', 10).head(5)
Run Code Online (Sandbox Code Playgroud)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
1 dt = (messages
2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)
/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
1167 """
-> 1168 return self.select('*', col.alias(colName))
1169
1170 @ignore_unicode_prefix
AttributeError: 'int' object has no attribute 'alias'
Run Code Online (Sandbox Code Playgroud)
似乎我可以通过添加和减去其中一个列(因此它们添加到零)然后添加我想要的数字(在这种情况下为10)来欺骗函数按照我想要的方式工作:
dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
Run Code Online (Sandbox Code Playgroud)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=93506471, messagetype=1, dt=4809600.0, …Run Code Online (Sandbox Code Playgroud) 我尝试了df.orderBy("col1").show(10)但它按升序排序.df.sort("col1").show(10)也按降序排序.我查看了stackoverflow,我发现的答案都已过时或提交给RDD.我想在spark中使用原生数据帧.
我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.
我试过以下但没有成功:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)
使用这个也有错误:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)
那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?
我们如何在Apache Spark DataFrame中连接两列?我们可以使用Spark SQL中的任何函数吗?
apache-spark ×10
apache-spark-sql ×10
scala ×6
dataframe ×5
pyspark ×2
python ×2
rdd ×2
sql ×2
hadoop ×1
hdfs ×1
partitioning ×1