标签: pyspark

在Spark RDD和/或Spark DataFrame中重新整形/透视数据

我有以下格式的数据(RDD或Spark DataFrame):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

 rdd = sc.parallelize([('X01',41,'US',3),
                       ('X01',41,'UK',1),
                       ('X01',41,'CA',2),
                       ('X02',72,'US',4),
                       ('X02',72,'UK',6),
                       ('X02',72,'CA',7),
                       ('X02',72,'XX',8)])

# convert to a Spark DataFrame                    
schema = StructType([StructField('ID', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Country', StringType(), True),
                     StructField('Score', IntegerType(), True)])

df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我想做的是'重塑'数据,将Country(特别是美国,英国和CA)中的某些行转换为列:

ID    Age  US  UK  CA  
'X01'  41  3   1   2  
'X02'  72  4   6   7   
Run Code Online (Sandbox Code Playgroud)

从本质上讲,我需要Python的pivot工作流程:

categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID', 
                                                  columns = 'Country',
                                                  values = 'Score')
Run Code Online (Sandbox Code Playgroud)

我的数据集相当大,所以我不能真正地collect()将数据摄取到内存中来进行Python本身的重塑.有没有办法 …

python pivot apache-spark apache-spark-sql pyspark

25
推荐指数
3
解决办法
2万
查看次数

如何使用PySpark加载IPython shell

我想加载IPython shell(不是IPython笔记本),我可以通过命令行使用PySpark.那可能吗?我安装了Spark-1.4.1.

python ipython apache-spark pyspark

25
推荐指数
5
解决办法
2万
查看次数

如何在Python中排除Spark数据帧中的多个列

我发现PySpark有一个调用的方法,drop但它似乎一次只能删除一列.有关如何同时删除多个列的任何想法?

df.drop(['col1','col2'])
Run Code Online (Sandbox Code Playgroud)
TypeError                                 Traceback (most recent call last)
<ipython-input-96-653b0465e457> in <module>()
----> 1 selectedMachineView = machineView.drop([['GpuName','GPU1_TwoPartHwID']])

/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.pyc in drop(self, col)
   1257             jdf = self._jdf.drop(col._jc)
   1258         else:
-> 1259             raise TypeError("col should be a string or a Column")
   1260         return DataFrame(jdf, self.sql_ctx)
   1261 

TypeError: col should be a string or a Column
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql pyspark

25
推荐指数
3
解决办法
3万
查看次数

在PySpark中爆炸

我想从包含单词列表的DataFrame转换为DataFrame,每个单词都在自己的行中.

如何在DataFrame中的列上进行爆炸?

下面是我的一些尝试示例,您可以在其中取消注释每个代码行并获取以下注释中列出的错误.我在Python 2.7中使用PySpark和Spark 1.6.1.

from pyspark.sql.functions import split, explode
DF = sqlContext.createDataFrame([('cat \n\n elephant rat \n rat cat', )], ['word'])
print 'Dataset:'
DF.show()
print '\n\n Trying to do explode: \n'
DFsplit_explode = (
 DF
 .select(split(DF['word'], ' '))
#  .select(explode(DF['word']))  # AnalysisException: u"cannot resolve 'explode(word)' due to data type mismatch: input to function explode should be array or map type, not StringType;"
#   .map(explode)  # AttributeError: 'PipelinedRDD' object has no attribute 'show'
#   .explode()  # AttributeError: 'DataFrame' object has no …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

25
推荐指数
2
解决办法
5万
查看次数

如何使用pyspark在Spark 2.0中构建sparkSession?

我刚接触到spark 2.0; 到目前为止,我一直在使用spark 1.6.1.有人可以帮我用pyspark(python)设置sparkSession吗?我知道在线提供的scala示例类似(这里),但我希望能直接使用python语言.

我的具体情况:我在一个zeppelin spark笔记本中加载来自S3的avro文件.然后构建df并运行各种pyspark和sql查询.我所有的旧查询都使用sqlContext.我知道这是不好的做法,但我开始使用我的笔记本

sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().

我可以在avros中阅读

mydata = sqlContext.read.format("com.databricks.spark.avro").load("s3:...

并构建没有问题的数据帧.但是一旦我开始查询dataframes/temp表,我就会收到"java.lang.NullPointerException"错误.我认为这表明存在转换错误(例如,旧查询在1.6.1中工作但需要针对2.0进行调整).无论查询类型如何,都会发生错误.所以我假设

1.)sqlContext别名是个坏主意

2.)我需要正确设置sparkSession.

因此,如果有人能告诉我这是如何完成的,或者可能解释他们所知道的不同版本的火花之间的差异,我将非常感激.如果我需要详细说明这个问题,请告诉我.如果它令人费解,我道歉.

python sql apache-spark pyspark

25
推荐指数
3
解决办法
6万
查看次数

我如何阅读从Spark编写的PySpark中的镶木地板?

我在分析中使用两个Jupyter笔记本来做不同的事情.在我的Scala笔记本中,我将一些已清理的数据写入镶木地板:

partitionedDF.select("noStopWords","lowerText","prediction").write.save("swift2d://xxxx.keystone/commentClusters.parquet")
Run Code Online (Sandbox Code Playgroud)

然后我去我的Python笔记本读取数据:

df = spark.read.load("swift2d://xxxx.keystone/commentClusters.parquet")
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

AnalysisException: u'Unable to infer schema for ParquetFormat at swift2d://RedditTextAnalysis.keystone/commentClusters.parquet. It must be specified manually;'
Run Code Online (Sandbox Code Playgroud)

我查看了spark文档,我认为不应该要求我指定一个模式.有没有人碰到这样的事情?我保存/加载时应该做些什么吗?数据将在对象存储中登陆.

编辑:我在读取和写入时都会唱出spark 2.0.

edit2:这是在Data Science Experience的一个项目中完成的.

python scala apache-spark pyspark data-science-experience

25
推荐指数
2
解决办法
4万
查看次数

我似乎无法在Spark上使用--py文件来工作

我在Spark上使用Python时遇到了问题.我的应用程序有一些依赖项,例如numpy,pandas,astropy等.我不能使用virtualenv来创建一个包含所有依赖项的环境,因为除了HDFS之外,集群上的节点没有任何公共mountpoint或文件系统.因此我坚持使用spark-submit --py-files.我将site-packages的内容打包到一个ZIP文件中并像使用--py-files=dependencies.zip选项一样提交作业(最简单的方法是在Spark执行器节点上安装Python依赖项?).但是,群集上的节点似乎仍然没有看到内部的模块,并且ImportError在导入numpy时它们会抛出这样的内容.

File "/path/anonymized/module.py", line 6, in <module>
    import numpy
File "/tmp/pip-build-4fjFLQ/numpy/numpy/__init__.py", line 180, in <module>   
File "/tmp/pip-build-4fjFLQ/numpy/numpy/add_newdocs.py", line 13, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/__init__.py", line 8, in <module>
    #
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/type_check.py", line 11, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/core/__init__.py", line 14, in <module>
ImportError: cannot import name multiarray
Run Code Online (Sandbox Code Playgroud)

当我切换到virtualenv并使用本地pyspark shell时,一切正常,所以依赖都在那里.有谁知道,什么可能导致这个问题,以及如何解决它?

谢谢!

python apache-spark pyspark

24
推荐指数
3
解决办法
3万
查看次数

Pyspark:解析一列json字符串

我有一个pyspark数据框,由一列调用json,其中每一行都是一个json的unicode字符串.我想解析每一行并返回一个新的数据帧,其中每一行都是解析的json.

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
Run Code Online (Sandbox Code Playgroud)

我已尝试使用以下方法映射每一行json.loads:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()
Run Code Online (Sandbox Code Playgroud)

但这会返回一个 TypeError: expected string or buffer

我怀疑问题的一部分是,当从a转换为a dataframerdd,架构信息会丢失,所以我也尝试手动输入架构信息:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
Run Code Online (Sandbox Code Playgroud)

但我也是这样TypeError.

看看这个答案,看起来平坦化行flatMap可能在这里很有用,但我也没有成功:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd …
Run Code Online (Sandbox Code Playgroud)

python json apache-spark pyspark

24
推荐指数
4
解决办法
4万
查看次数

将DataFrame保存为CSV时指定文件名

假设我有一个Spark DF,我想将其保存到磁盘CSV文件.在Spark 2.0.0+中,可以转换DataFrame(DataSet[Rows])为a DataFrameWriter并使用该.csv方法来编写文件.

该功能定义为

def csv(path: String): Unit
    path : the location/folder name and not the file name.
Run Code Online (Sandbox Code Playgroud)

Spark将csv文件存储在指定位置,方法是创建名称为part - *.csv的CSV文件.

有没有办法用指定的文件名而不是部分保存CSV - *.csv?或者可以指定前缀而不是part-r?

代码:

df.coalesce(1).write.csv("sample_path")
Run Code Online (Sandbox Code Playgroud)

电流输出:

sample_path
|
+-- part-r-00000.csv
Run Code Online (Sandbox Code Playgroud)

期望的输出:

sample_path
|
+-- my_file.csv
Run Code Online (Sandbox Code Playgroud)

注意: coalesce函数用于输出单个文件,执行程序有足够的内存来收集DF而没有内存错误.

csv scala apache-spark pyspark

24
推荐指数
1
解决办法
3万
查看次数

使用pyspark获取列的数据类型

我们正在从MongoDB读取数据Collection.Collection列有两个不同的值(例如:) (bson.Int64,int) (int,float).

我试图使用pyspark获取数据类型.

我的问题是有些列有不同的数据类型.

假设quantity并且weight是列

quantity           weight
---------          --------
12300              656
123566000000       789.6767
1238               56.22
345                23
345566677777789    21
Run Code Online (Sandbox Code Playgroud)

实际上我们没有为mongo集合的任何列定义数据类型.

当我从中查询计数时 pyspark dataframe

dataframe.count()
Run Code Online (Sandbox Code Playgroud)

我这样的例外

"Cannot cast STRING into a DoubleType (value: BsonString{value=&apos;200.0&apos;})"
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark databricks

24
推荐指数
4
解决办法
5万
查看次数