我有一个数据框,有一行和几列.一些列是单个值,其他列是列表.所有列表列的长度都相同.我想将每个列表列拆分为一个单独的行,同时保持任何非列表列不变.
样本DF:
from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode
sqlc = SQLContext(sc)
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
Run Code Online (Sandbox Code Playgroud)
我想要的是:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
Run Code Online (Sandbox Code Playgroud)
如果我只有一个列表列,只需执行以下操作即可explode:
df_exploded = df.withColumn('b', explode('b')) …Run Code Online (Sandbox Code Playgroud) 有人可以帮我解决Spark DataFrame中的这个问题吗?
当我这样做时,myFloatRDD.toDF()我收到一个错误:
TypeError:无法推断类型的模式:类型'float'
我不明白为什么......
例:
myFloatRdd = sc.parallelize([1.0,2.0,3.0])
df = myFloatRdd.toDF()
Run Code Online (Sandbox Code Playgroud)
谢谢
我需要使用
(rdd.)partitionBy(npartitions, custom_partitioner)
Run Code Online (Sandbox Code Playgroud)
DataFrame上不可用的方法.所有DataFrame方法仅引用DataFrame结果.那么如何从DataFrame数据创建RDD呢?
注意:这是从1.2.0开始的更改(在1.3.0中).
从@dpangmao的答案更新:方法是.rdd.我有兴趣了解(a)它是否公开以及(b)性能影响是什么.
那么(a)是肯定的和(b) - 你可以在这里看到有重要的性能影响:必须通过调用mapPartitions创建一个新的RDD :
在dataframe.py中(注意文件名也改变了(是sql.py):
@property
def rdd(self):
"""
Return the content of the :class:`DataFrame` as an :class:`RDD`
of :class:`Row` s.
"""
if not hasattr(self, '_lazy_rdd'):
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
return itertools.imap(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
return self._lazy_rdd
Run Code Online (Sandbox Code Playgroud) 我见过各种各样的人都认为这Dataframe.explode是一种有用的方法,但它会导致比原始数据帧更多的行,这根本不是我想要的.我只是想做Dataframe相当于非常简单:
rdd.map(lambda row: row + [row.my_str_col.split('-')])
Run Code Online (Sandbox Code Playgroud)
它看起来像:
col1 | my_str_col
-----+-----------
18 | 856-yygrm
201 | 777-psgdg
Run Code Online (Sandbox Code Playgroud)
并将其转换为:
col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
Run Code Online (Sandbox Code Playgroud)
我知道pyspark.sql.functions.split(),但它导致嵌套数组列而不是我想要的两个顶级列.
理想情况下,我希望这些新列也可以命名.
apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql
我想弄清楚为什么我的groupByKey返回以下内容:
[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]
Run Code Online (Sandbox Code Playgroud)
我有flatMapped值,如下所示:
[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]
Run Code Online (Sandbox Code Playgroud)
我做的很简单:
groupRDD = columnRDD.groupByKey()
Run Code Online (Sandbox Code Playgroud) 我正在尝试加载SVM文件并将其转换为一个,DataFrame因此我可以使用PipelineSpark 的ML模块(ML).我刚刚在Ubuntu 14.04上安装了一个新的Spark 1.5.0(没有spark-env.sh配置).
我my_script.py是:
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
sc = SparkContext("local", "Teste Original")
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
我正在使用: ./spark-submit my_script.py
我收到错误:
Traceback (most recent call last):
File "/home/fred-spark/spark-1.5.0-bin-hadoop2.6/pipeline_teste_original.py", line 34, in <module>
data = MLUtils.loadLibSVMFile(sc, "/home/fred-spark/svm_capture").toDF()
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
Run Code Online (Sandbox Code Playgroud)
我无法理解的是,如果我跑:
data = MLUtils.loadLibSVMFile(sc, "/home/svm_capture").toDF()
Run Code Online (Sandbox Code Playgroud)
直接在PySpark shell中,它的工作原理.
我是Spark-DataFrame API的初学者.
我使用此代码将csv tab分隔为Spark Dataframe
lines = sc.textFile('tail5.csv')
parts = lines.map(lambda l : l.strip().split('\t'))
fnames = *some name list*
schemaData = StructType([StructField(fname, StringType(), True) for fname in fnames])
ddf = sqlContext.createDataFrame(parts,schemaData)
Run Code Online (Sandbox Code Playgroud)
假设我使用Spark从新文件创建DataFrame,并使用内置方法toPandas()将其转换为pandas,
我正在编写一个Spark应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个Key-Multivalue对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:
My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
Run Code Online (Sandbox Code Playgroud)
发生这种情况时我得到的错误是:
'NoneType'对象没有attribue'追加'.
我的键是整数,值V1,...,Vn是元组.我的目标是使用密钥和值列表(元组)创建一对.
我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.
在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sa和sb.每个Row包含name,id_sa和id_sb.我的目标是从生产映射id_sa到id_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.
让我们试着用一个例子来澄清.如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)
我的目标是从生产映射a1到b2.事实上,相关的名称a1是n1,n2和n3,分别映射b1,b2和b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.
我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)
但也许我正在尝试使用错误的工具,我应该回到使用RDD.
我正在使用pyspark,使用spark-csv将大型csv文件加载到数据框中,作为预处理步骤,我需要对其中一列(包含json字符串)中可用的数据应用各种操作.这将返回X值,每个值都需要存储在各自独立的列中.
该功能将在UDF中实现.但是,我不确定如何从该UDF返回值列表并将这些值提供给单个列.下面是一个简单的例子:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
Run Code Online (Sandbox Code Playgroud)
这产生以下结果:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
将udf在不同的列上返回的两个值(在此示例中)存储的最佳方法是什么?现在他们被键入字符串:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
Run Code Online (Sandbox Code Playgroud) python user-defined-functions apache-spark apache-spark-sql pyspark
apache-spark ×10
pyspark ×10
python ×8
dataframe ×2
rdd ×2
mapreduce ×1
pandas ×1
pyspark-sql ×1