标签: spark-dataframe

Spark sql Dataframe - 导入sqlContext.implicits._

我有main创建spark上下文:

    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
Run Code Online (Sandbox Code Playgroud)

然后创建数据帧并对数据帧进行过滤和验证.

    val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00")

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0))
    // record length cannot be < 2 
    .na.drop(3)
    // round to hours
    .withColumn("time",convertToHourly($"time"))
Run Code Online (Sandbox Code Playgroud)

这非常有效.

但是当我尝试通过发送数据帧将我的验证移动到另一个文件时

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}
Run Code Online (Sandbox Code Playgroud)

获取Dataframe并进行验证和转换:似乎我需要

 import sqlContext.implicits._
Run Code Online (Sandbox Code Playgroud)

为了避免错误:"value $不是StringContext的成员"在线发生:.withColumn("time",convertToHourly($ "time"))

但要使用 import sqlContext.implicits._ 我还需要sqlContext在新文件中定义,如下所示:

val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Run Code Online (Sandbox Code Playgroud)

或发送给

function ValidateAndTransform(df: DataFrame) : DataFrame = …
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql spark-dataframe

14
推荐指数
1
解决办法
2万
查看次数

Pyspark数据框LIKE运算符

Pyspark中LIKE运算符的等价物是什么?例如,我想做:

SELECT * FROM table WHERE column LIKE "*somestring*";
Run Code Online (Sandbox Code Playgroud)

寻找像这样简单的东西(但这不起作用):

df.select('column').where(col('column').like("*s*")).show()
Run Code Online (Sandbox Code Playgroud)

pyspark spark-dataframe

14
推荐指数
6
解决办法
4万
查看次数

如何针对另一个数据帧过滤一个spark数据帧

我正在尝试将一个数据帧与另一个数据帧进行过滤:

scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")
Run Code Online (Sandbox Code Playgroud)

现在我想过滤df1并返回一个包含df1中所有行的数据帧,其中user_id在df2("valid_id")中.换句话说,我想要df1中的所有行,其中user_id是2,3,4,5或6

scala> df1.select("user_id").filter($"user_id" in df2("valid_id"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20);  
Run Code Online (Sandbox Code Playgroud)

另一方面,当我尝试对函数进行过滤时,一切看起来都很棒:

scala> df1.select("user_id").filter(($"user_id" % 2) === 0)
res1: org.apache.spark.sql.DataFrame = [user_id: int]
Run Code Online (Sandbox Code Playgroud)

为什么我收到此错误?我的语法有问题吗?

以下评论我试图做左外连接:

scala> df1.show
+-------+------------------+-------+
|   name|             score|user_id|
+-------+------------------+-------+
| user 1|             0.123|      1|
| user 2|             0.246|      2|
| user …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

13
推荐指数
1
解决办法
2万
查看次数

Python API中是否提供Spark SQL UDAF(用户定义的聚合函数)?

从Spark 1.5.0开始,似乎可以编写自己的UDAF用于DataFrames上的自定义聚合: Spark 1.5 DataFrame API要点:日期/时间/字符串处理,时间间隔和UDAF

但是,我不清楚Python API是否支持此功能?

apache-spark apache-spark-sql spark-dataframe

13
推荐指数
1
解决办法
5532
查看次数

Apache spark处理case语句

我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!

基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.

     case  
               when (e."a" Like 'a%' Or e."b" Like 'b%') 
                And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'

               when (e."a" Like 'b%' Or e."b" Like 'a%') 
                And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'

else

'CallitC'
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark spark-dataframe pyspark-sql

13
推荐指数
2
解决办法
3万
查看次数

在pySpark上执行连接时"已解决的属性缺失"

我有以下两个pySpark数据帧:

> df_lag_pre.columns
['date','sku','name','country','ccy_code','quantity','usd_price','usd_lag','lag_quantity']

> df_unmatched.columns
['alt_sku', 'alt_lag_quantity', 'country', 'ccy_code', 'name', 'usd_price']
Run Code Online (Sandbox Code Playgroud)

现在我想在常用列上加入它们,所以我尝试以下方法:

> df_lag_pre.join(df_unmatched, on=['name','country','ccy_code','usd_price'])
Run Code Online (Sandbox Code Playgroud)

我收到以下错误消息:

AnalysisException: u'resolved attribute(s) price#3424 missing from country#3443,month#801,price#808,category#803,subcategory#804,page#805,date#280,link#809,name#806,quantity#807,ccy_code#3439,sku#3004,day#802 in operator !EvaluatePython PythonUDF#<lambda>(ccy_code#3439,price#3424), pythonUDF#811: string;'
Run Code Online (Sandbox Code Playgroud)

显示此错误的某些列(例如price)df_lag是构建于其中的另一个数据框的一部分.我找不到有关如何解释此消息的任何信息,因此任何帮助将不胜感激.

apache-spark pyspark spark-dataframe

13
推荐指数
1
解决办法
7329
查看次数

pySpark Dataframe上的多个聚合标准

我有一个pySpark数据框,如下所示:

+-------------+----------+
|          sku|      date|
+-------------+----------+
|MLA-603526656|02/09/2016|
|MLA-603526656|01/09/2016|
|MLA-604172009|02/10/2016|
|MLA-605470584|02/09/2016|
|MLA-605502281|02/10/2016|
|MLA-605502281|02/09/2016|
+-------------+----------+
Run Code Online (Sandbox Code Playgroud)

我想通过sku分组,然后计算最小和最大日期.如果我这样做:

df_testing.groupBy('sku') \
    .agg({'date': 'min', 'date':'max'}) \
    .limit(10) \
    .show()
Run Code Online (Sandbox Code Playgroud)

行为与Pandas相同,我只获取skumax(date)列.在Pandas我通常会做以下事情来得到我想要的结果:

df_testing.groupBy('sku') \
    .agg({'day': ['min','max']}) \
    .limit(10) \
    .show()
Run Code Online (Sandbox Code Playgroud)

但是在pySpark上这不起作用,我得到一个java.util.ArrayList cannot be cast to java.lang.String错误.谁能指点我正确的语法?

谢谢.

apache-spark pyspark spark-dataframe

13
推荐指数
1
解决办法
1万
查看次数

如何使用Spark(pyspark)编写镶木地板文件?

我是Spark的新手,我一直在尝试将一个Dataframe转换为Spark中的镶木地板文件,但我还没有成功.该文件说,我可以使用write.parquet函数来创建该文件.但是,当我运行脚本时它向我显示:AttributeError:'RDD'对象没有属性'write'

from pyspark import SparkContext
sc = SparkContext("local", "Protob Conversion to Parquet ")

# spark is an existing SparkSession
df = sc.textFile("/temp/proto_temp.csv")

# Displays the content of the DataFrame to stdout
df.write.parquet("/output/proto.parquet")
Run Code Online (Sandbox Code Playgroud)

你知道怎么做这个吗?

我正在使用的spark版本是为Hadoop 2.7.3构建的Spark 2.0.1.

python pyspark spark-dataframe

13
推荐指数
2
解决办法
5万
查看次数

Pyspark:使用字符串格式的正则表达式过滤数据帧?

我已经阅读了几篇关于使用"like"运算符来过滤火花数据帧的帖子,条件是包含一个字符串/表达式,但是想知道以下是否是在所需条件下使用%s的"最佳实践"如下:

input_path = <s3_location_str>
my_expr = "Arizona.*hot"  # a regex expression
dx = sqlContext.read.parquet(input_path)  # "keyword" is a field in dx

# is the following correct?
substr = "'%%%s%%'" %my_keyword  # escape % via %% to get "%"
dk = dx.filter("keyword like %s" %substr)

# dk should contain rows with keyword values such as "Arizona is hot."
Run Code Online (Sandbox Code Playgroud)

注意

我正在尝试获取包含表达式my_keyword的dx中的所有行.否则,对于完全匹配,我们不需要周围百分号'%'.

regex apache-spark-sql pyspark spark-dataframe pyspark-sql

13
推荐指数
3
解决办法
2万
查看次数

火花访问前n行 - 采取vs限制

我想访问火花数据帧的前100行,并将结果写回CSV文件.

为什么take(100)基本上是即时的,而

df.limit(100)
      .repartition(1)
      .write
      .mode(SaveMode.Overwrite)
      .option("header", true)
      .option("delimiter", ";")
      .csv("myPath")
Run Code Online (Sandbox Code Playgroud)

需要永远.我不想获得每个分区的前100条记录,而只需要获得100条记录.

limit apache-spark apache-spark-sql spark-dataframe

13
推荐指数
2
解决办法
1万
查看次数