标签: apache-spark-sql

通过以字符串格式减去两个日期时间列来计算持续时间

我有一个Spark Dataframe,其中包含一系列日期:

from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd

rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
                                    ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
                                    ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
                                    ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
                                    ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我想做的是duration通过减去EndDateTime和找到StartDateTime.我想我会尝试使用函数执行此操作:

# Function to calculate time delta
def time_delta(y,x): 
    end = pd.to_datetime(y)
    start = pd.to_datetime(x)
    delta = (end-start)
    return delta

# create new RDD and add new column 'Duration' by applying …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

28
推荐指数
3
解决办法
5万
查看次数

Spark窗口函数 - rangeBetween日期

我有一个DataFrame带有数据的Spark SQL ,我想要得到的是给定日期范围内当前行之前的所有行.因此,例如,我希望将7天之前的所有行放在给定行之前.我想我需要使用Window Function像:

Window \
    .partitionBy('id') \
    .orderBy('start')
Run Code Online (Sandbox Code Playgroud)

这就是问题所在.我想要有rangeBetween7天的时间,但是我在这个文件中找不到任何内容.Spark甚至提供这样的选择吗?现在我只是得到前面的所有行:

.rowsBetween(-sys.maxsize, 0)
Run Code Online (Sandbox Code Playgroud)

但想要实现以下目标:

.rangeBetween("7 days", 0)
Run Code Online (Sandbox Code Playgroud)

如果有人能帮助我,我将非常感激.提前致谢!

sql window-functions apache-spark apache-spark-sql pyspark

28
推荐指数
3
解决办法
2万
查看次数

Spark:如何在Dataframe API中翻译count(distinct(value))

我正在尝试比较不同的方式来聚合我的数据.

这是我的输入数据,包含2个元素(页面,访问者):

(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
Run Code Online (Sandbox Code Playgroud)

使用以下代码将SQL命令用于Spark SQL:

import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
                              """select page
                                       ,count(distinct visitor) as visitor
                                   from logs
                               group by page
                              """)
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
Run Code Online (Sandbox Code Playgroud)

我得到这个输出:

(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Run Code Online (Sandbox Code Playgroud)

现在,我想使用Dataframes和他们的API获得相同的结果,但我无法获得相同的输出:

import sqlContext.implicits._
case class Log(page: String, visitor: …
Run Code Online (Sandbox Code Playgroud)

count distinct dataframe apache-spark apache-spark-sql

27
推荐指数
2
解决办法
5万
查看次数

Spark,Hadoop和Cassandra之间的关系是什么?

我的理解是Spark是Hadoop的替代品.但是,在尝试安装Spark时,安装页面会要求安装现有的Hadoop.我无法找到任何澄清这种关系的东西.

其次,Spark显然与Cassandra和Hive有良好的连接.两者都有sql风格的界面.但是,Spark有自己的sql.为什么人们会使用Cassandra/Hive而不是Spark的原生sql?假设这是一个没有现有安装的全新项目?

hadoop cassandra apache-spark apache-spark-sql

27
推荐指数
2
解决办法
1万
查看次数

SparkSQL:如何处理用户定义函数中的空值?

给定表1,其中一列为"x",类型为String.我想创建表2,其中列为"y",它是"x"中给出的日期字符串的整数表示形式.

必不可少的是将null值保留在"y"列中.

表1(数据帧df1):

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

表2(数据帧df2):

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

用于将列"x"中的值转换为列"y"的用户定义函数(udf)为:

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )
Run Code Online (Sandbox Code Playgroud)

并且工作,处理空值是不可能的.

尽管如此,我可以做类似的事情

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != …
Run Code Online (Sandbox Code Playgroud)

scala nullable user-defined-functions apache-spark apache-spark-sql

27
推荐指数
3
解决办法
4万
查看次数

na().drop()和filter(col.isNotNull)之间的区别(Apache Spark)

是否有语义之间的任何差异df.na().drop(),并df.filter(df.col("onlyColumnInOneColumnDataFrame").isNotNull() && !df.col("onlyColumnInOneColumnDataFrame").isNaN())在那里dfApache的火花 Dataframe

或者我认为它是一个错误,如果第一个null不在后面返回(不是一个String null,而只是一个null值)在列中onlyColumnInOneColumnDataFrame,第二个没有?

编辑:也添加!isNaN().这onlyColumnInOneColumnDataFrame是给定的唯一列Dataframe.让我们说它的类型是Integer.

apache-spark apache-spark-sql

27
推荐指数
1
解决办法
4万
查看次数

火花数据帧中滤波的多个条件

我有一个包含四个字段的数据框.其中一个字段名称是Status,我试图在.filter中使用OR条件来表示数据帧.我试过下面的查询,但没有运气.

df2 = df1.filter(("Status=2") || ("Status =3"))

df2 = df1.filter("Status=2" || "Status =3")
Run Code Online (Sandbox Code Playgroud)

有没有人以前用过这个.我在这里看到了关于堆栈溢出的类似问题.他们使用下面的代码来使用OR条件.但该代码适用于pyspark.

from pyspark.sql.functions import col 

numeric_filtered = df.where(
(col('LOW')    != 'null') | 
(col('NORMAL') != 'null') |
(col('HIGH')   != 'null'))
numeric_filtered.show()
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

27
推荐指数
2
解决办法
8万
查看次数

在PySpark中的GroupedData上应用UDF(具有正常运行的python示例)

我有这个在pandas数据帧中本地运行的python代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.

我尝试过以下方法:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 
Run Code Online (Sandbox Code Playgroud)

返回

KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)

我推测因为'A'不再是一列而我找不到x.name的等价物.

然后

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()
Run Code Online (Sandbox Code Playgroud)

但是得到以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)

任何建议将非常感谢!

python user-defined-functions apache-spark apache-spark-sql pyspark

27
推荐指数
4
解决办法
2万
查看次数

如何有效地找到PySpark数据帧中每列的Null和Nan值的计数?

import numpy as np

df = spark.createDataFrame(
    [(1, 1, None), (1, 2, float(5)), (1, 3, np.nan), (1, 4, None), (1, 5, float(10)), (1, 6, float('nan')), (1, 6, float('nan'))],
    ('session', "timestamp1", "id2"))
Run Code Online (Sandbox Code Playgroud)

预期产出

每列的数量为nan/null的数据帧

注意: 我在堆栈溢出中发现的先前问题仅检查null而不是nan.这就是为什么我创造了一个新问题.

我知道我可以在spark中使用isnull()函数来查找Spark列中的Null值的数量但是如何在Spark数据帧中找到Nan值?

apache-spark apache-spark-sql pyspark pyspark-sql

27
推荐指数
4
解决办法
6万
查看次数

过滤火花DataFrame上的字符串包含

我使用Spark 1.3.0Spark Avro 1.0.0.我正在使用存储库页面上的示例.以下代码运行良好

val df = sqlContext.read.avro("src/test/resources/episodes.avro")
df.filter("doctor > 5").write.avro("/tmp/output")
Run Code Online (Sandbox Code Playgroud)

但是如果我需要查看doctor字符串是否包含子字符串呢?因为我们在字符串中编写表达式.我怎么做"包含"?

scala dataframe apache-spark apache-spark-sql

26
推荐指数
1
解决办法
9万
查看次数