标签: apache-spark

如何选择每组的第一行?

我有一个DataFrame生成如下:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value") as "TotalValue")
  .sort($"Hour".asc, $"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)

如您所见,DataFrame按Hour递增顺序排序,然后按TotalValue降序排序.

我想选择每组的顶行,即

  • 来自小时组== 0选择(0,cat26,30.9)
  • 来自小时组== 1选择(1,cat67,28.5)
  • 来自小时组== …

sql scala dataframe apache-spark apache-spark-sql

122
推荐指数
3
解决办法
8万
查看次数

如何打印RDD的内容?

我正在尝试将集合的内容打印到Spark控制台.

我有一个类型:

linesWithSessionId: org.apache.spark.rdd.RDD[String] = FilteredRDD[3]
Run Code Online (Sandbox Code Playgroud)

我使用命令:

scala> linesWithSessionId.map(line => println(line))
Run Code Online (Sandbox Code Playgroud)

但这是印刷的:

res1:org.apache.spark.rdd.RDD [Unit] = MappedRDD [4] at map at:19

如何将RDD写入控制台或将其保存到磁盘,以便查看其内容?

scala apache-spark

120
推荐指数
4
解决办法
21万
查看次数

Spark - 将CSV文件加载为DataFrame?

我想在spark中读取CSV并将其转换为DataFrame并将其存储在HDFS中 df.registerTempTable("table_name")

我试过了:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
Run Code Online (Sandbox Code Playgroud)

我得到的错误:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Run Code Online (Sandbox Code Playgroud)

在Apache Spark中将CSV文件作为DataFrame加载的正确命令是什么?

hadoop scala hdfs apache-spark apache-spark-sql

120
推荐指数
7
解决办法
31万
查看次数

Apache Spark:map vs mapPartitions?

RDD mapmapPartitions方法有什么区别?并且flatMap表现得像map或喜欢mapPartitions?谢谢.

(编辑)即,两者之间的差异(在语义上或在执行方面)

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }
Run Code Online (Sandbox Code Playgroud)

和:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Run Code Online (Sandbox Code Playgroud)

performance scala apache-spark rdd

119
推荐指数
3
解决办法
10万
查看次数

如何定义DataFrame的分区?

我已经开始在Spark 1.4.0中使用Spark SQL和DataFrames.我想在Scala中定义DataFrame上的自定义分区程序,但是没有看到如何执行此操作.

我正在使用的一个数据表包含一个事务列表,按帐户,silimar到下面的示例.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00
Run Code Online (Sandbox Code Playgroud)

至少在最初,大多数计算将发生在帐户内的交易之间.所以我希望对数据进行分区,以便帐户的所有事务都在同一个Spark分区中.

但我没有看到定义这个的方法.DataFrame类有一个名为"repartition(Int)"的方法,您可以在其中指定要创建的分区数.但我没有看到任何方法可用于为DataFrame定义自定义分区程序,例如可以为RDD指定.

源数据存储在Parquet中.我确实看到在向Parquet编写DataFrame时,您可以指定要分区的列,因此我可以告诉Parquet通过"帐户"列对其数据进行分区.但是可能有数百万个帐户,如果我正确理解Parquet,它会为每个帐户创建一个独特的目录,因此这听起来不是一个合理的解决方案.

有没有办法让Spark分区这个DataFrame,以便一个帐户的所有数据都在同一个分区?

scala partitioning dataframe apache-spark apache-spark-sql

119
推荐指数
5
解决办法
14万
查看次数

如何在Spark DataFrame中添加常量列?

我想在a中添加一个DataFrame具有任意值的列(对于每一行都是相同的).我使用时出现错误withColumn如下:

dt.withColumn('new_column', 10).head(5)
Run Code Online (Sandbox Code Playgroud)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'
Run Code Online (Sandbox Code Playgroud)

似乎我可以通过添加和减去其中一个列(因此它们添加到零)然后添加我想要的数字(在这种情况下为10)来欺骗函数按照我想要的方式工作:

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
Run Code Online (Sandbox Code Playgroud)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

116
推荐指数
2
解决办法
12万
查看次数

如何设置Apache Spark Executor内存

如何增加Apache spark executor节点的可用内存?

我有一个2 GB的文件,适合加载到Apache Spark.我正在1台机器上运行apache spark,所以驱动程序和执行程序在同一台机器上.该机器有8 GB的内存.

当我在设置要在内存中缓存的文件后尝试计算文件的行时,我得到以下错误:

2014-10-25 22:25:12 WARN  CacheManager:71 - Not enough space to cache partition rdd_1_1 in memory! Free memory is 278099801 bytes.
Run Code Online (Sandbox Code Playgroud)

我看了看文档,这里并设置spark.executor.memory4g$SPARK_HOME/conf/spark-defaults.conf

UI显示此变量在Spark环境中设置.你可以在这里找到截图

但是,当我转到Executor选项卡时,我的单个Executor的内存限制仍然设置为265.4 MB.我还是得到了同样的错误.

我尝试了这里提到的各种各样的东西,但我仍然得到错误,并且不清楚我应该在哪里更改设置.

我正在从spark-shell以交互方式运行我的代码

memory apache-spark

114
推荐指数
4
解决办法
12万
查看次数

如何在Spark SQL中按降序排列?

我尝试了df.orderBy("col1").show(10)但它按升序排序.df.sort("col1").show(10)也按降序排序.我查看了stackoverflow,我发现的答案都已过时或提交给RDD.我想在spark中使用原生数据帧.

scala apache-spark apache-spark-sql

110
推荐指数
5
解决办法
20万
查看次数

如何向Spark DataFrame添加新列(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.

我试过以下但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)

那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?

python dataframe apache-spark apache-spark-sql pyspark

110
推荐指数
4
解决办法
21万
查看次数

在python shell中导入pyspark

这是另一个从未回答过的论坛上的别人问题的副本,所以我想我会在这里重新提问,因为我有同样的问题.(见http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736)

我在我的机器上正确安装了Spark,并且当使用./bin/pyspark作为我的python解释器时,能够使用pyspark模块运行python程序而不会出错.

但是,当我尝试运行常规Python shell时,当我尝试导入pyspark模块时,我收到此错误:

from pyspark import SparkContext
Run Code Online (Sandbox Code Playgroud)

它说

"No module named pyspark".
Run Code Online (Sandbox Code Playgroud)

我怎样才能解决这个问题?是否需要设置环境变量以将Python指向pyspark headers/libraries/etc. 如果我的火花安装是/ spark /,我需要包含哪些pyspark路径?或者pyspark程序只能从pyspark解释器运行?

python apache-spark pyspark

102
推荐指数
11
解决办法
14万
查看次数