我有main创建spark上下文:
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
Run Code Online (Sandbox Code Playgroud)
然后创建数据帧并对数据帧进行过滤和验证.
val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00")
val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0))
// record length cannot be < 2
.na.drop(3)
// round to hours
.withColumn("time",convertToHourly($"time"))
Run Code Online (Sandbox Code Playgroud)
这非常有效.
但是当我尝试通过发送数据帧将我的验证移动到另一个文件时
function ValidateAndTransform(df: DataFrame) : DataFrame = {...}
Run Code Online (Sandbox Code Playgroud)
获取Dataframe并进行验证和转换:似乎我需要
import sqlContext.implicits._
Run Code Online (Sandbox Code Playgroud)
为了避免错误:"value $不是StringContext的成员"在线发生:.withColumn("time",convertToHourly($ "time"))
但要使用 import sqlContext.implicits._
我还需要sqlContext在新文件中定义,如下所示:
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Run Code Online (Sandbox Code Playgroud)
或发送给
function ValidateAndTransform(df: DataFrame) : DataFrame = …Run Code Online (Sandbox Code Playgroud) Pyspark中LIKE运算符的等价物是什么?例如,我想做:
SELECT * FROM table WHERE column LIKE "*somestring*";
Run Code Online (Sandbox Code Playgroud)
寻找像这样简单的东西(但这不起作用):
df.select('column').where(col('column').like("*s*")).show()
Run Code Online (Sandbox Code Playgroud) 我正在尝试将一个数据帧与另一个数据帧进行过滤:
scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")
Run Code Online (Sandbox Code Playgroud)
现在我想过滤df1并返回一个包含df1中所有行的数据帧,其中user_id在df2("valid_id")中.换句话说,我想要df1中的所有行,其中user_id是2,3,4,5或6
scala> df1.select("user_id").filter($"user_id" in df2("valid_id"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20);
Run Code Online (Sandbox Code Playgroud)
另一方面,当我尝试对函数进行过滤时,一切看起来都很棒:
scala> df1.select("user_id").filter(($"user_id" % 2) === 0)
res1: org.apache.spark.sql.DataFrame = [user_id: int]
Run Code Online (Sandbox Code Playgroud)
为什么我收到此错误?我的语法有问题吗?
以下评论我试图做左外连接:
scala> df1.show
+-------+------------------+-------+
| name| score|user_id|
+-------+------------------+-------+
| user 1| 0.123| 1|
| user 2| 0.246| 2|
| user …Run Code Online (Sandbox Code Playgroud) 从Spark 1.5.0开始,似乎可以编写自己的UDAF用于DataFrames上的自定义聚合: Spark 1.5 DataFrame API要点:日期/时间/字符串处理,时间间隔和UDAF
但是,我不清楚Python API是否支持此功能?
我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!
基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.
case
when (e."a" Like 'a%' Or e."b" Like 'b%')
And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'
when (e."a" Like 'b%' Or e."b" Like 'a%')
And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'
else
'CallitC'
Run Code Online (Sandbox Code Playgroud) 我有以下两个pySpark数据帧:
> df_lag_pre.columns
['date','sku','name','country','ccy_code','quantity','usd_price','usd_lag','lag_quantity']
> df_unmatched.columns
['alt_sku', 'alt_lag_quantity', 'country', 'ccy_code', 'name', 'usd_price']
Run Code Online (Sandbox Code Playgroud)
现在我想在常用列上加入它们,所以我尝试以下方法:
> df_lag_pre.join(df_unmatched, on=['name','country','ccy_code','usd_price'])
Run Code Online (Sandbox Code Playgroud)
我收到以下错误消息:
AnalysisException: u'resolved attribute(s) price#3424 missing from country#3443,month#801,price#808,category#803,subcategory#804,page#805,date#280,link#809,name#806,quantity#807,ccy_code#3439,sku#3004,day#802 in operator !EvaluatePython PythonUDF#<lambda>(ccy_code#3439,price#3424), pythonUDF#811: string;'
Run Code Online (Sandbox Code Playgroud)
显示此错误的某些列(例如price)df_lag是构建于其中的另一个数据框的一部分.我找不到有关如何解释此消息的任何信息,因此任何帮助将不胜感激.
我有一个pySpark数据框,如下所示:
+-------------+----------+
| sku| date|
+-------------+----------+
|MLA-603526656|02/09/2016|
|MLA-603526656|01/09/2016|
|MLA-604172009|02/10/2016|
|MLA-605470584|02/09/2016|
|MLA-605502281|02/10/2016|
|MLA-605502281|02/09/2016|
+-------------+----------+
Run Code Online (Sandbox Code Playgroud)
我想通过sku分组,然后计算最小和最大日期.如果我这样做:
df_testing.groupBy('sku') \
.agg({'date': 'min', 'date':'max'}) \
.limit(10) \
.show()
Run Code Online (Sandbox Code Playgroud)
行为与Pandas相同,我只获取sku和max(date)列.在Pandas我通常会做以下事情来得到我想要的结果:
df_testing.groupBy('sku') \
.agg({'day': ['min','max']}) \
.limit(10) \
.show()
Run Code Online (Sandbox Code Playgroud)
但是在pySpark上这不起作用,我得到一个java.util.ArrayList cannot be cast to java.lang.String错误.谁能指点我正确的语法?
谢谢.
我是Spark的新手,我一直在尝试将一个Dataframe转换为Spark中的镶木地板文件,但我还没有成功.该文件说,我可以使用write.parquet函数来创建该文件.但是,当我运行脚本时它向我显示:AttributeError:'RDD'对象没有属性'write'
from pyspark import SparkContext
sc = SparkContext("local", "Protob Conversion to Parquet ")
# spark is an existing SparkSession
df = sc.textFile("/temp/proto_temp.csv")
# Displays the content of the DataFrame to stdout
df.write.parquet("/output/proto.parquet")
Run Code Online (Sandbox Code Playgroud)
你知道怎么做这个吗?
我正在使用的spark版本是为Hadoop 2.7.3构建的Spark 2.0.1.
我已经阅读了几篇关于使用"like"运算符来过滤火花数据帧的帖子,条件是包含一个字符串/表达式,但是想知道以下是否是在所需条件下使用%s的"最佳实践"如下:
input_path = <s3_location_str>
my_expr = "Arizona.*hot" # a regex expression
dx = sqlContext.read.parquet(input_path) # "keyword" is a field in dx
# is the following correct?
substr = "'%%%s%%'" %my_keyword # escape % via %% to get "%"
dk = dx.filter("keyword like %s" %substr)
# dk should contain rows with keyword values such as "Arizona is hot."
Run Code Online (Sandbox Code Playgroud)
注意
我正在尝试获取包含表达式my_keyword的dx中的所有行.否则,对于完全匹配,我们不需要周围百分号'%'.
我想访问火花数据帧的前100行,并将结果写回CSV文件.
为什么take(100)基本上是即时的,而
df.limit(100)
.repartition(1)
.write
.mode(SaveMode.Overwrite)
.option("header", true)
.option("delimiter", ";")
.csv("myPath")
Run Code Online (Sandbox Code Playgroud)
需要永远.我不想获得每个分区的前100条记录,而只需要获得100条记录.
spark-dataframe ×10
apache-spark ×6
pyspark ×6
pyspark-sql ×2
limit ×1
python ×1
rdd ×1
regex ×1
scala ×1