我使用以下代码来获得薪水大于某个阈值的人的平均年龄.
dataframe.filter(df['salary'] > 100000).agg({"avg": "age"})
Run Code Online (Sandbox Code Playgroud)
列的年龄是数字(浮点数),但我仍然收到此错误.
py4j.protocol.Py4JJavaError: An error occurred while calling o86.agg.
: scala.MatchError: age (of class java.lang.String)
Run Code Online (Sandbox Code Playgroud)
您是否知道在不使用groupBy函数和SQL查询的情况下获得平均等的任何其他方法.
我正在尝试从Spark 1.6.1迁移到Spark 2.0.0,并且在尝试将csv文件读入SparkSQL时遇到了一个奇怪的错误.以前,当我在pyspark中从本地磁盘读取文件时,我会这样做:
Spark 1.6
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.option('header', 'true') \
.load('file:///C:/path/to/my/file.csv', schema=mySchema)
Run Code Online (Sandbox Code Playgroud)
在最新版本中,我认为它应该是这样的:
Spark 2.0
spark = SparkSession.builder \
.master('local[*]') \
.appName('My App') \
.getOrCreate()
df = spark.read \
.format('csv') \
.option('header', 'true') \
.load('file:///C:/path/to/my/file.csv', schema=mySchema)
Run Code Online (Sandbox Code Playgroud)
但无论我尝试调整路径的方式有多少,我都会收到此错误:
IllegalArgumentException: 'java.net.URISyntaxException: Relative path in
absolute URI: file:/C:/path//to/my/file/spark-warehouse'
Run Code Online (Sandbox Code Playgroud)
不确定这只是Windows的一个问题,还是我缺少的东西.我很高兴spark-csv软件包现在已成为Spark的一部分开箱即用,但我似乎无法再阅读任何本地文件了.有任何想法吗?
我是Spark SQL的新手,我正在尝试将字符串转换为spark数据框中的时间戳.我有一个'2017-08-01T02:26:59.000Z'名为time_string的列中的字符串
我将此字符串转换为时间戳的代码是
CAST (time_string AS Timestamp)
Run Code Online (Sandbox Code Playgroud)
但这给了我一个时间戳 2017-07-31 19:26:59
为什么要改变时间?有没有办法在不改变时间的情况下做到这一点?
谢谢你的帮助!
我需要在pyspark数据帧中转动多个列.示例数据框,
>>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]
>>> mydf = spark.createDataFrame(d,['id','day','price','units'])
>>> mydf.show()
+---+---+-----+-----+
| id|day|price|units|
+---+---+-----+-----+
|100| 1| 23| 10|
|100| 2| 45| 11|
|100| 3| 67| 12|
|100| 4| 78| 13|
|101| 1| 23| 10|
|101| 2| 45| 13|
|101| 3| 67| 14|
|101| 4| 78| 15|
|102| 1| 23| 10|
|102| 2| 45| 11|
|102| 3| 67| 16|
|102| 4| 78| 18|
+---+---+-----+-----+
Run Code Online (Sandbox Code Playgroud)
现在,如果我需要根据日期将每个id的价格列放到一行,那么我可以使用pivot方法,
>>> pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.first('price'))
>>> pvtdf.show()
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|100| 23| 45| 67| 78| …Run Code Online (Sandbox Code Playgroud) 我是新来的火花,sparkSql当我遇到这两个命令时,我正在尝试使用python中的一些命令:
createOrReplaceTempView()和registerTempTable().
这两个命令有什么区别?它们似乎具有相同的功能集.
我的数据原则上是一个表,除了其他"数据"之外,它还包含一列ID和一列GROUP_ID.
在第一步中,我将CSV读入Spark,进行一些处理以准备第二步的数据,并将数据写为镶木地板.第二步做了很多的groupBy('GROUP_ID')和Window.partitionBy('GROUP_ID').orderBy('ID').
现在的目标是 - 为了避免在第二步骤洗牌 - 在第一步骤中有效地加载数据,因为这是一定时器.
问题第1部分: AFAIK,Spark在从镶木地板加载时保留了分区(这实际上是任何"优化写入考虑"的基础) - 对吗?
我提出了三种可能性:
df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')我会设置n个别镶木地板文件大约100MB.
问第2部分:它是正确的,这三个选项的目标(避免在第2步洗牌)方面产生"相同" /类似的结果?如果没有,有什么区别?哪一个'更好'?
问题第3部分:关于步骤1,三个选项中哪一个表现更好?
感谢您分享您的知识!
编辑2017-07-24
在做了一些测试(写入和读取镶木地板)后,似乎Spark 在第二步中默认无法恢复partitionBy和orderBy信息.分区的数量(从df.rdd.getNumPartitions()似乎由核心数量和/或spark.default.parallelism(如果设置)确定,但不是由镶木地板分区的数量决定.因此问题1的答案是错误的,问题2和3将是无关紧要.
因此,原来的真正的问题是:有没有办法告诉星火,该数据已经被列分区X和列进行排序ÿ?
我正在寻找一种方法来在pyspark中选择我的数据帧的列.对于第一行,我知道我可以使用df.first()但不确定列,因为它们没有列名.
我有5列,想要遍历每一列.
+--+---+---+---+---+---+---+
|_1| _2| _3| _4| _5| _6| _7|
+--+---+---+---+---+---+---+
|1 |0.0|0.0|0.0|1.0|0.0|0.0|
|2 |1.0|0.0|0.0|0.0|0.0|0.0|
|3 |0.0|0.0|1.0|0.0|0.0|0.0|
Run Code Online (Sandbox Code Playgroud) 我用databrick csv包启动了shell
#../spark-1.6.1-bin-hadoop2.6/bin/pyspark --packages com.databricks:spark-csv_2.11:1.3.0
Run Code Online (Sandbox Code Playgroud)
然后我读了一个csv文件做了一些groupby操作并将其转储到csv.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load(path.csv') ####it has columns and df.columns works fine
type(df) #<class 'pyspark.sql.dataframe.DataFrame'>
#now trying to dump a csv
df.write.format('com.databricks.spark.csv').save('path+my.csv')
#it creates a directory my.csv with 2 partitions
### To create single file i followed below line of code
#df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("path+file_satya.csv") ## this creates one partition in directory of csv name
#but in both cases no columns information(How to add column names to that csv …Run Code Online (Sandbox Code Playgroud) 一个pyspark.sql.DataFrame混乱的显示DataFrame.show()- 行换行而不是滚动.
我试过这些选择
import IPython
IPython.auto_scroll_threshold = 9999
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from IPython.display import display
Run Code Online (Sandbox Code Playgroud)
但没有运气.虽然在Atom编辑器中使用jupyter插件时滚动工作:
假设我有一个包含数字1-10的numpy数组a.所以a是[1 2 3 4 5 6 7 8 9 10].
现在,我还有一个Python Spark数据帧,我想要添加我的numpy数组a.我认为一列文字将完成这项工作.所以我做了以下事情:
df = df.withColumn("NewColumn", F.lit(a))
Run Code Online (Sandbox Code Playgroud)
这不起作用.错误是"不支持的文字类型类java.util.ArrayList".
现在,如果我只尝试数组中的一个元素,如下所示,它可以工作.
df = df.withColumn("NewColumn", F.lit(a[0]))
Run Code Online (Sandbox Code Playgroud)
有没有办法可以做我正在尝试的事情?我一直在努力完成我想完成的任务,这是我最接近完成它的任务.我查看了所有相关的Stack Overflow问题,但我没有得到我想要的答案.任何帮助表示赞赏.谢谢.
pyspark-sql ×10
apache-spark ×9
pyspark ×9
python ×4
ipython ×1
literals ×1
pandas ×1
sparkr ×1
sql ×1
windows ×1