小编use*_*411的帖子

如何选择每组的第一行?

我有一个DataFrame生成如下:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value") as "TotalValue")
  .sort($"Hour".asc, $"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)

如您所见,DataFrame按Hour递增顺序排序,然后按TotalValue降序排序.

我想选择每组的顶行,即

  • 来自小时组== 0选择(0,cat26,30.9)
  • 来自小时组== 1选择(1,cat67,28.5)
  • 来自小时组== …

sql scala dataframe apache-spark apache-spark-sql

122
推荐指数
3
解决办法
8万
查看次数

如何在Spark DataFrame中添加常量列?

我想在a中添加一个DataFrame具有任意值的列(对于每一行都是相同的).我使用时出现错误withColumn如下:

dt.withColumn('new_column', 10).head(5)
Run Code Online (Sandbox Code Playgroud)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'
Run Code Online (Sandbox Code Playgroud)

似乎我可以通过添加和减去其中一个列(因此它们添加到零)然后添加我想要的数字(在这种情况下为10)来欺骗函数按照我想要的方式工作:

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
Run Code Online (Sandbox Code Playgroud)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

116
推荐指数
2
解决办法
12万
查看次数

如何向Spark DataFrame添加新列(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.

我试过以下但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)

那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?

python dataframe apache-spark apache-spark-sql pyspark

110
推荐指数
4
解决办法
21万
查看次数

使用spark-csv编写单个CSV文件

我正在使用https://github.com/databricks/spark-csv,我正在尝试编写单个CSV,但不能,它正在创建一个文件夹.

需要一个Scala函数,它将获取路径和文件名等参数并写入该CSV文件.

csv scala apache-spark spark-csv

92
推荐指数
8
解决办法
17万
查看次数

参数"stratify"来自方法"train_test_split"(scikit Learn)

我试图使用train_test_split包scikit Learn,但我遇到参数问题stratify.以下是代码:

from sklearn import cross_validation, datasets 

X = iris.data[:,:2]
y = iris.target

cross_validation.train_test_split(X,y,stratify=y)
Run Code Online (Sandbox Code Playgroud)

但是,我一直遇到以下问题:

raise TypeError("Invalid parameters passed: %s" % str(options))
TypeError: Invalid parameters passed: {'stratify': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0, 0, 0, 0, 0, …
Run Code Online (Sandbox Code Playgroud)

test-data split training-data scikit-learn

67
推荐指数
6
解决办法
8万
查看次数

查询具有复杂类型的Spark SQL DataFrame

如何查询具有复杂类型(如地图/数组)的RDD?例如,当我写这个测试代码时:

case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
Run Code Online (Sandbox Code Playgroud)

我虽然语法如下:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
Run Code Online (Sandbox Code Playgroud)

要么

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
Run Code Online (Sandbox Code Playgroud)

但我明白了

无法访问MapType类型中的嵌套字段(StringType,StringType,true)

org.apache.spark.sql.catalyst.errors.package $ TreeNodeException:未解析的属性

分别.

sql scala dataframe apache-spark apache-spark-sql

54
推荐指数
1
解决办法
5万
查看次数

为什么在创建自定义案例类的数据集时"无法找到存储在数据集中的类型的编码器"?

使用Scala 2.11.8的Spark 2.0(最终版).以下超级简单代码会产生编译错误Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-dataset apache-spark-encoders

53
推荐指数
2
解决办法
5万
查看次数

如何运行Spark Java程序

我为Spark编写了一个Java程序.但是如何从Unix命令行运行和编译它.编译运行时是否必须包含任何jar

java apache-spark

42
推荐指数
3
解决办法
7万
查看次数

如何在Spark SQL中定义和使用用户定义的聚合函数?

我知道如何在Spark SQL中编写UDF:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)

我可以做类似的定义聚合函数吗?这是怎么做到的?

对于上下文,我想运行以下SQL查询:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)

它应该返回类似的东西

Row(span1, false, T0)

我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?

scala aggregate-functions user-defined-functions apache-spark apache-spark-sql

37
推荐指数
1
解决办法
3万
查看次数

使用列的长度过滤DataFrame

我想DataFrame使用与列长度相关的条件来过滤a ,这个问题可能很容易,但我没有在SO中找到任何相关的问题.

更具体的,我有一个DataFrame只有一个Column,其中ArrayType(StringType()),我要筛选的DataFrame使用长度filterer,我拍下面的一个片段.

df = sqlContext.read.parquet("letters.parquet")
df.show()

# The output will be 
# +------------+
# |      tokens|
# +------------+
# |[L, S, Y, S]|
# |[L, V, I, S]|
# |[I, A, N, A]|
# |[I, L, S, A]|
# |[E, N, N, Y]|
# |[E, I, M, A]|
# |[O, A, N, A]|
# |   [S, U, S]|
# +------------+

# But I want only the entries with length …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

35
推荐指数
2
解决办法
5万
查看次数