我有一个Spark Dataframe,其中包含一系列日期:
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd
rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
StructField('EndDateTime', StringType(), True),
StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)
我想做的是duration通过减去EndDateTime和找到StartDateTime.我想我会尝试使用函数执行此操作:
# Function to calculate time delta
def time_delta(y,x):
end = pd.to_datetime(y)
start = pd.to_datetime(x)
delta = (end-start)
return delta
# create new RDD and add new column 'Duration' by applying …Run Code Online (Sandbox Code Playgroud) 我有一个DataFrame带有数据的Spark SQL ,我想要得到的是给定日期范围内当前行之前的所有行.因此,例如,我希望将7天之前的所有行放在给定行之前.我想我需要使用Window Function像:
Window \
.partitionBy('id') \
.orderBy('start')
Run Code Online (Sandbox Code Playgroud)
这就是问题所在.我想要有rangeBetween7天的时间,但是我在这个文件中找不到任何内容.Spark甚至提供这样的选择吗?现在我只是得到前面的所有行:
.rowsBetween(-sys.maxsize, 0)
Run Code Online (Sandbox Code Playgroud)
但想要实现以下目标:
.rangeBetween("7 days", 0)
Run Code Online (Sandbox Code Playgroud)
如果有人能帮助我,我将非常感激.提前致谢!
我正在尝试比较不同的方式来聚合我的数据.
这是我的输入数据,包含2个元素(页面,访问者):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
Run Code Online (Sandbox Code Playgroud)
使用以下代码将SQL命令用于Spark SQL:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
Run Code Online (Sandbox Code Playgroud)
我得到这个输出:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Run Code Online (Sandbox Code Playgroud)
现在,我想使用Dataframes和他们的API获得相同的结果,但我无法获得相同的输出:
import sqlContext.implicits._
case class Log(page: String, visitor: …Run Code Online (Sandbox Code Playgroud) 我的理解是Spark是Hadoop的替代品.但是,在尝试安装Spark时,安装页面会要求安装现有的Hadoop.我无法找到任何澄清这种关系的东西.
其次,Spark显然与Cassandra和Hive有良好的连接.两者都有sql风格的界面.但是,Spark有自己的sql.为什么人们会使用Cassandra/Hive而不是Spark的原生sql?假设这是一个没有现有安装的全新项目?
给定表1,其中一列为"x",类型为String.我想创建表2,其中列为"y",它是"x"中给出的日期字符串的整数表示形式.
必不可少的是将null值保留在"y"列中.
表1(数据帧df1):
+----------+
| x|
+----------+
|2015-09-12|
|2015-09-13|
| null|
| null|
+----------+
root
|-- x: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
表2(数据帧df2):
+----------+--------+
| x| y|
+----------+--------+
| null| null|
| null| null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
|-- x: string (nullable = true)
|-- y: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
用于将列"x"中的值转换为列"y"的用户定义函数(udf)为:
val extractDateAsInt = udf[Int, String] (
(d:String) => d.substring(0, 10)
.filterNot( "-".toSet)
.toInt )
Run Code Online (Sandbox Code Playgroud)
并且工作,处理空值是不可能的.
尽管如此,我可以做类似的事情
val extractDateAsIntWithNull = udf[Int, String] (
(d:String) =>
if (d != …Run Code Online (Sandbox Code Playgroud) scala nullable user-defined-functions apache-spark apache-spark-sql
是否有语义之间的任何差异df.na().drop(),并df.filter(df.col("onlyColumnInOneColumnDataFrame").isNotNull() && !df.col("onlyColumnInOneColumnDataFrame").isNaN())在那里df是Apache的火花 Dataframe?
或者我认为它是一个错误,如果第一个null不在后面返回(不是一个String null,而只是一个null值)在列中onlyColumnInOneColumnDataFrame,第二个没有?
编辑:也添加!isNaN().这onlyColumnInOneColumnDataFrame是给定的唯一列Dataframe.让我们说它的类型是Integer.
我有一个包含四个字段的数据框.其中一个字段名称是Status,我试图在.filter中使用OR条件来表示数据帧.我试过下面的查询,但没有运气.
df2 = df1.filter(("Status=2") || ("Status =3"))
df2 = df1.filter("Status=2" || "Status =3")
Run Code Online (Sandbox Code Playgroud)
有没有人以前用过这个.我在这里看到了关于堆栈溢出的类似问题.他们使用下面的代码来使用OR条件.但该代码适用于pyspark.
from pyspark.sql.functions import col
numeric_filtered = df.where(
(col('LOW') != 'null') |
(col('NORMAL') != 'null') |
(col('HIGH') != 'null'))
numeric_filtered.show()
Run Code Online (Sandbox Code Playgroud) 我有这个在pandas数据帧中本地运行的python代码:
df_result = pd.DataFrame(df
.groupby('A')
.apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)
我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.
我尝试过以下方法:
sparkDF
.groupby('A')
.agg(myFunction(zip('B', 'C'), 'A'))
Run Code Online (Sandbox Code Playgroud)
返回
KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)
我推测因为'A'不再是一列而我找不到x.name的等价物.
然后
sparkDF
.groupby('A')
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))
.toDF()
Run Code Online (Sandbox Code Playgroud)
但是得到以下错误:
AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)
任何建议将非常感谢!
python user-defined-functions apache-spark apache-spark-sql pyspark
import numpy as np
df = spark.createDataFrame(
[(1, 1, None), (1, 2, float(5)), (1, 3, np.nan), (1, 4, None), (1, 5, float(10)), (1, 6, float('nan')), (1, 6, float('nan'))],
('session', "timestamp1", "id2"))
Run Code Online (Sandbox Code Playgroud)
预期产出
每列的数量为nan/null的数据帧
注意: 我在堆栈溢出中发现的先前问题仅检查null而不是nan.这就是为什么我创造了一个新问题.
我知道我可以在spark中使用isnull()函数来查找Spark列中的Null值的数量但是如何在Spark数据帧中找到Nan值?
我使用Spark 1.3.0和Spark Avro 1.0.0.我正在使用存储库页面上的示例.以下代码运行良好
val df = sqlContext.read.avro("src/test/resources/episodes.avro")
df.filter("doctor > 5").write.avro("/tmp/output")
Run Code Online (Sandbox Code Playgroud)
但是如果我需要查看doctor字符串是否包含子字符串呢?因为我们在字符串中编写表达式.我怎么做"包含"?
apache-spark ×10
apache-spark-sql ×10
pyspark ×4
dataframe ×2
scala ×2
cassandra ×1
count ×1
distinct ×1
hadoop ×1
nullable ×1
pyspark-sql ×1
python ×1
sql ×1