标签: spark-dataframe

将spark DataFrame列转换为python列表

我处理一个包含两列mvv和count的数据帧.

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |
Run Code Online (Sandbox Code Playgroud)

我想获得两个包含mvv值和计数值的列表.就像是

mvv = [1,2,3,4]
count = [5,9,3,1]
Run Code Online (Sandbox Code Playgroud)

所以,我尝试了以下代码:第一行应该返回一个python列表行.我想看到第一个值:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
Run Code Online (Sandbox Code Playgroud)

但是我收到第二行的错误消息:

AttributeError:getInt

python apache-spark pyspark spark-dataframe

76
推荐指数
11
解决办法
12万
查看次数

更新spark中的dataframe列

查看新的spark数据帧api,目前还不清楚是否可以修改数据帧列.

我怎么会去改变行的值xy一个数据帧的?

pandas这将是df.ix[x,y] = new_value

编辑:合并下面所述的内容,您无法修改现有数据框,因为它是不可变的,但您可以返回具有所需修改的新数据框.

如果您只想根据条件替换列中的值,例如np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
Run Code Online (Sandbox Code Playgroud)

如果要对列执行某些操作并创建添加到数据帧的新列:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))
Run Code Online (Sandbox Code Playgroud)

如果您希望新列与旧列具有相同的名称,则可以添加其他步骤:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark spark-dataframe

64
推荐指数
5
解决办法
10万
查看次数

在spark数据帧写入方法中覆盖特定分区

我想覆盖特定的分区而不是所有的火花.我正在尝试以下命令:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')
Run Code Online (Sandbox Code Playgroud)

其中df是具有要覆盖的增量数据的数据帧.

hdfs-base-path包含主数据.

当我尝试上面的命令时,它会删除所有分区,并在hdfs路径中插入df中存在的分区.

我的要求是只覆盖指定hdfs路径中df中存在的那些分区.有人可以帮我吗?

apache-spark apache-spark-sql spark-dataframe

52
推荐指数
6
解决办法
5万
查看次数

如何从spark数据帧中过滤掉null值

我使用以下模式在spark中创建了一个数据框:

root
 |-- user_id: long (nullable = false)
 |-- event_id: long (nullable = false)
 |-- invited: integer (nullable = false)
 |-- day_diff: long (nullable = true)
 |-- interested: integer (nullable = false)
 |-- event_owner: long (nullable = false)
 |-- friend_id: long (nullable = false)
Run Code Online (Sandbox Code Playgroud)

数据如下所示:

+----------+----------+-------+--------+----------+-----------+---------+
|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null| …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

49
推荐指数
6
解决办法
12万
查看次数

上传列表以从火花数据框中选择多个列

我有一个火花数据框df.有没有办法使用这些列的列表选择几列?

scala> df.columns
res0: Array[String] = Array("a", "b", "c", "d")
Run Code Online (Sandbox Code Playgroud)

我知道我可以做点什么df.select("b", "c").但是假设我有一个包含几个列名的列表val cols = List("b", "c"),有没有办法将它传递给df.select?df.select(cols)抛出错误.像df.select(*cols)python中的东西

apache-spark apache-spark-sql spark-dataframe

47
推荐指数
3
解决办法
5万
查看次数

将Spark Dataframe字符串列拆分为多个列

我见过各种各样的人都认为这Dataframe.explode是一种有用的方法,但它会导致比原始数据帧更多的行,这根本不是我想要的.我只是想做Dataframe相当于非常简单:

rdd.map(lambda row: row + [row.my_str_col.split('-')])
Run Code Online (Sandbox Code Playgroud)

它看起来像:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg
Run Code Online (Sandbox Code Playgroud)

并将其转换为:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg
Run Code Online (Sandbox Code Playgroud)

我知道pyspark.sql.functions.split(),但它导致嵌套数组列而不是我想要的两个顶级列.

理想情况下,我希望这些新列也可以命名.

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

47
推荐指数
3
解决办法
7万
查看次数

38
推荐指数
1
解决办法
7万
查看次数

如何在单个加载中导入多个csv文件?

考虑我有一个已定义的架构,用于在文件夹中加载10个csv文件.有没有办法使用Spark SQL自动加载表.我知道这可以通过为每个文件[下面给出]使用单独的数据帧来执行,但是可以使用单个命令自动执行而不是指向文件我可以指向文件夹吗?

df = sqlContext.read
       .format("com.databricks.spark.csv")
       .option("header", "true")
       .load("../Downloads/2008.csv")
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

38
推荐指数
5
解决办法
6万
查看次数

从spark数据帧中取n行并传递给toPandas()

我有这个代码:

l = [('Alice', 1),('Jim',2),('Sandra',3)]
df = sqlContext.createDataFrame(l, ['name', 'age'])
df.withColumn('age2', df.age + 2).toPandas()
Run Code Online (Sandbox Code Playgroud)

工作正常,做它需要的东西.假设我只想显示前n行,然后调用toPandas()返回pandas数据帧.我该怎么做?我无法调用,take(n)因为它不会返回数据帧,因此我无法将其传递给toPandas().

换句话说,如何从数据帧中获取前n行并调用toPandas()结果数据帧?想不到这很难但我无法弄清楚.

我正在使用Spark 1.6.0.

python apache-spark-sql spark-dataframe

36
推荐指数
2
解决办法
6万
查看次数

Spark中的各种连接类型有哪些?

我查看了文档,并说它支持以下连接类型:

要执行的联接类型.默认内心.必须是以下之一:inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti.

我查看了关于SQL连接的StackOverflow答案,并且顶部几个答案没有提到上面的一些连接,例如left_semileft_anti.他们在Spark中意味着什么?

scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0

36
推荐指数
2
解决办法
5万
查看次数