标签: pyspark

Pyspark:将多个数组列拆分为行

我有一个数据框,有一行和几列.一些列是单个值,其他列是列表.所有列表列的长度都相同.我想将每个列表列拆分为一个单独的行,同时保持任何非列表列不变.

样本DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
Run Code Online (Sandbox Code Playgroud)

我想要的是:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+
Run Code Online (Sandbox Code Playgroud)

如果我只有一个列表列,只需执行以下操作即可explode:

df_exploded = df.withColumn('b', explode('b')) …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

50
推荐指数
3
解决办法
3万
查看次数

创建Spark DataFrame.无法推断类型的模式:<type'flora'>

有人可以帮我解决Spark DataFrame中的这个问题吗?

当我这样做时,myFloatRDD.toDF()我收到一个错误:

TypeError:无法推断类型的模式:类型'float'

我不明白为什么......

例:

myFloatRdd = sc.parallelize([1.0,2.0,3.0])
df = myFloatRdd.toDF()
Run Code Online (Sandbox Code Playgroud)

谢谢

python dataframe apache-spark apache-spark-sql pyspark

49
推荐指数
1
解决办法
5万
查看次数

如何在pyspark中将DataFrame转换回普通RDD?

我需要使用

(rdd.)partitionBy(npartitions, custom_partitioner)
Run Code Online (Sandbox Code Playgroud)

DataFrame上不可用的方法.所有DataFrame方法仅引用DataFrame结果.那么如何从DataFrame数据创建RDD呢?

注意:这是从1.2.0开始的更改(在1.3.0中).

从@dpangmao的答案更新:方法是.rdd.我有兴趣了解(a)它是否公开以及(b)性能影响是什么.

那么(a)是肯定的和(b) - 你可以在这里看到有重要的性能影响:必须通过调用mapPartitions创建一个新的RDD :

dataframe.py中(注意文件名也改变了(是sql.py):

@property
def rdd(self):
    """
    Return the content of the :class:`DataFrame` as an :class:`RDD`
    of :class:`Row` s.
    """
    if not hasattr(self, '_lazy_rdd'):
        jrdd = self._jdf.javaToPython()
        rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
        schema = self.schema

        def applySchema(it):
            cls = _create_cls(schema)
            return itertools.imap(cls, it)

        self._lazy_rdd = rdd.mapPartitions(applySchema)

    return self._lazy_rdd
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

48
推荐指数
3
解决办法
8万
查看次数

将Spark Dataframe字符串列拆分为多个列

我见过各种各样的人都认为这Dataframe.explode是一种有用的方法,但它会导致比原始数据帧更多的行,这根本不是我想要的.我只是想做Dataframe相当于非常简单:

rdd.map(lambda row: row + [row.my_str_col.split('-')])
Run Code Online (Sandbox Code Playgroud)

它看起来像:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg
Run Code Online (Sandbox Code Playgroud)

并将其转换为:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg
Run Code Online (Sandbox Code Playgroud)

我知道pyspark.sql.functions.split(),但它导致嵌套数组列而不是我想要的两个顶级列.

理想情况下,我希望这些新列也可以命名.

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

47
推荐指数
3
解决办法
7万
查看次数

PySpark groupByKey返回pyspark.resultiterable.ResultIterable

我想弄清楚为什么我的groupByKey返回以下内容:

[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]
Run Code Online (Sandbox Code Playgroud)

我有flatMapped值,如下所示:

[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]
Run Code Online (Sandbox Code Playgroud)

我做的很简单:

groupRDD = columnRDD.groupByKey()
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

46
推荐指数
2
解决办法
5万
查看次数

'PipelinedRDD'对象在PySpark中没有属性'toDF'

我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).

my_script.py是:

from pyspark.mllib.util import MLUtils
from pyspark import SparkContext

sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

我正在使用: ./spark-submit my_script.py

我收到错误:

Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)

我无法理解的是,如果我跑:

data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)

直接在PySpark shell中,它的工作原理.

python apache-spark rdd apache-spark-sql pyspark

45
推荐指数
1
解决办法
4万
查看次数

什么是Spark DataFrame方法`toPandas`实际上在做什么?

我是Spark-DataFrame API的初学者.

我使用此代码将csv tab分隔为Spark Dataframe

lines = sc.textFile('tail5.csv')
parts = lines.map(lambda l : l.strip().split('\t'))
fnames = *some name list*
schemaData = StructType([StructField(fname, StringType(), True) for fname in fnames])
ddf = sqlContext.createDataFrame(parts,schemaData)
Run Code Online (Sandbox Code Playgroud)

假设我使用Spark从新文件创建DataFrame,并使用内置方法toPandas()将其转换为pandas,

  • 它是否将Pandas对象存储到本地内存?
  • Pandas低级计算是否由Spark处理?
  • 它是否暴露了所有pandas数据帧功能?(我想是的)
  • 我可以将它转换为潘达斯,只是完成它,没有那么多的触摸DataFrame API?

python pandas apache-spark pyspark

44
推荐指数
2
解决办法
5万
查看次数

使用Apache Spark将键值对减少为键列表对

我正在编写一个Spark应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个Key-Multivalue对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
Run Code Online (Sandbox Code Playgroud)

发生这种情况时我得到的错误是:

'NoneType'对象没有attribue'追加'.

我的键是整数,值V1,...,Vn是元组.我的目标是使用密钥和值列表(元组)创建一对.

python mapreduce apache-spark rdd pyspark

42
推荐指数
4
解决办法
8万
查看次数

在Spark DataFrame中查找每个组的最大行数

我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.

在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sasb.每个Row包含name,id_said_sb.我的目标是从生产映射id_said_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.

让我们试着用一个例子来澄清.如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)

我的目标是从生产映射a1b2.事实上,相关的名称a1n1,n2n3,分别映射b1,b2b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.

我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)

但也许我正在尝试使用错误的工具,我应该回到使用RDD.

apache-spark apache-spark-sql pyspark

42
推荐指数
2
解决办法
5万
查看次数

Apache Spark - 将UDF的结果分配给多个数据帧列

我正在使用pyspark,使用spark-csv将大型csv文件加载到数据框中,作为预处理步骤,我需要对其中一列(包含json字符串)中可用的数据应用各种操作.这将返回X值,每个值都需要存储在各自独立的列中.

该功能将在UDF中实现.但是,我不确定如何从该UDF返回值列表并将这些值提供给单个列.下面是一个简单的例子:

(...)
from pyspark.sql.functions import udf
def udf_test(n):
    return [n/2, n%2]

test_udf=udf(udf_test)


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
Run Code Online (Sandbox Code Playgroud)

这产生以下结果:

+------+----------+--------------------+
|amount|trans_date|                test|
+------+----------+--------------------+
|  28.0|2016-02-07|         [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

将udf在不同的列上返回的两个值(在此示例中)存储的最佳方法是什么?现在他们被键入字符串:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()

root
 |-- amount: float (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- test: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

python user-defined-functions apache-spark apache-spark-sql pyspark

41
推荐指数
1
解决办法
2万
查看次数