我想通过使用EF在Row_number上分区加载数据.
SELECT *
FROM (
SELECT sf.SerialFlowsId
,sf.GoodsSerialId
,d.FormTypeId
, d.GoodsId
,ROW_NUMBER() OVER (PARTITION BY d.GoodsId, sf.GoodsSerialId ORDER BY sf.Date DESC)row
FROM sam.SerialFlows sf
INNER JOIN sam.Detail d ON d.DetailId = sf.DetailId
)z
WHERE z.row =1
AND z.FormTypeId=7
AND z.GoodsId=51532
Run Code Online (Sandbox Code Playgroud)
这个问题是我的期待.
我尝试使用这个表达式,但遗憾的是Zip扩展方法无法在ef中识别
var goodsSerials = context.SerialFlows.OrderByDescending(x => x.Date).GroupBy(x => new { x.Detail.GoodsID, x.Date })
.Select(g => new {g})
.SelectMany(z => z.g.Select(c => c)).Zip(m, (j, i) => new { GoodSerial=j,j.Detail.FormTypeID,j.Detail.GoodsID,rn=i })
.Where(x => x.rn== 1 && x.GoodsID== goodsId && x.FormTypeID==7).Select(x => …Run Code Online (Sandbox Code Playgroud) 我的问题是由计算spark数据帧中连续行之间差异的用例触发的.
例如,我有:
>>> df.show()
+-----+----------+
|index| col1|
+-----+----------+
| 0.0|0.58734024|
| 1.0|0.67304325|
| 2.0|0.85154736|
| 3.0| 0.5449719|
+-----+----------+
Run Code Online (Sandbox Code Playgroud)
如果我选择使用"Window"函数计算它们,那么我可以这样做:
>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index| col1| diffs_col1|
+-----+----------+-----------+
| 0.0|0.58734024|0.085703015|
| 1.0|0.67304325| 0.17850411|
| 2.0|0.85154736|-0.30657548|
| 3.0| 0.5449719| null|
+-----+----------+-----------+
Run Code Online (Sandbox Code Playgroud)
问题:我在一个分区中明确地划分了数据帧.这会对性能产生什么影响,如果存在,为什么会这样,我怎么能避免它呢?因为当我没有指定分区时,我收到以下警告:
16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Run Code Online (Sandbox Code Playgroud) partitioning window-functions apache-spark apache-spark-sql pyspark
我需要计算表中各种尺寸的百分比.我想通过使用窗口函数来计算分母来简化事情,但是我遇到了问题,因为分子也必须是聚合.
举个简单的例子,请看下表:
create temp table test (d1 text, d2 text, v numeric);
insert into test values ('a','x',5), ('a','y',5), ('a','y',10), ('b','x',20);
Run Code Online (Sandbox Code Playgroud)
如果我只想计算d1中每一行的份额,那么窗口函数可以正常工作:
select d1, d2, v/sum(v) over (partition by d1)
from test;
"b";"x";1.00
"a";"x";0.25
"a";"y";0.25
"a";"y";0.50
Run Code Online (Sandbox Code Playgroud)
但是,我需要做的是计算d1中d2之和的总份额.我正在寻找的输出是这样的:
"b";"x";1.00
"a";"x";0.25
"a";"y";0.75
Run Code Online (Sandbox Code Playgroud)
所以我试试这个:
select d1, d2, sum(v)/sum(v) over (partition by d1)
from test
group by d1, d2;
Run Code Online (Sandbox Code Playgroud)
但是,现在我收到一个错误:
ERROR: column "test.v" must appear in the GROUP BY clause or be used in an aggregate function
Run Code Online (Sandbox Code Playgroud)
我假设这是因为它抱怨在分组子句中没有考虑窗口函数,但是无论如何窗口函数都不能放在分组子句中.
这是使用Greenplum 4.1,它是Postgresql 8.4的一个分支,并共享相同的窗口函数.请注意,Greenplum无法执行相关子查询.
sql postgresql aggregate-functions window-functions greenplum
在PostgreSQL 9.4中,窗口函数具有a的新选项,FILTER用于选择窗口框架的子集以进行处理.文档提到了它,但没有提供样本.在线搜索会产生一些样本,包括来自2ndQuadrant的样本,但我发现的所有样本都是具有常量表达式的相当简单的例子.我要找的是一个包含当前行值的过滤器表达式.
假设我有一堆包含一堆列的表,其中一列是date类型:
col1 | col2 | dt ------------------------ 1 | a | 2015-07-01 2 | b | 2015-07-03 3 | c | 2015-07-10 4 | d | 2015-07-11 5 | e | 2015-07-11 6 | f | 2015-07-13 ...
date在整个表上处理的窗口定义很简单:WINDOW win AS (ORDER BY dt)
我有兴趣知道在当前行(包括)之前的4天中存在多少行.所以我想生成这个输出:
col1 | col2 | dt | count -------------------------------- 1 | a | 2015-07-01 | 1 2 | b | 2015-07-03 | 2 3 …
我在Spark SQL,Scala中查看Spark DataFrame的窗口滑动功能.
我有一个数据框,列Col1,Col1,Col1,日期.
Col1 Col2 Col3 date volume new_col
201601 100.5
201602 120.6 100.5
201603 450.2 120.6
201604 200.7 450.2
201605 121.4 200.7`
Run Code Online (Sandbox Code Playgroud)
现在我想添加一个名为(new_col)的新列,其中一行向下滑动,如上所示.
我试过下面的选项来使用窗口功能.
val windSldBrdrxNrx_df = df.withColumn("Prev_brand_rx", lag("Prev_brand_rx",1))
Run Code Online (Sandbox Code Playgroud)
任何人都可以帮助我如何做到这一点.
我有一个由时间戳列和美元列组成的数据集.我想找到每行的平均美元数,在每行的时间戳结束.我最初看的是pyspark.sql.functions.window函数,但是按周分类数据.
这是一个例子:
%pyspark
import datetime
from pyspark.sql import functions as F
df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))
w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()
Run Code Online (Sandbox Code Playgroud)
这导致两条记录:
| start | end | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|
Run Code Online (Sandbox Code Playgroud)
窗口函数将时间序列数据分类,而不是执行滚动平均值.
有没有办法执行滚动平均值,我会回到每行的每周平均值,时间段结束于行的timestampGMT?
编辑:
张的答案接近我想要的,但不完全是我想看到的.
这是一个更好的例子来展示我想要得到的东西:
%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", …Run Code Online (Sandbox Code Playgroud) 我想知道Spark是否知道镶木地板文件的分区键,并使用此信息来避免随机播放.
语境:
运行Spark 2.0.1运行本地SparkSession.我有一个csv数据集,我将其保存为我的磁盘上的镶木地板文件,如下所示:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
Run Code Online (Sandbox Code Playgroud)
我按列创建了42个分区numerocarte.这应该将多个组分组numerocarte到同一个分区.我write当时不想做partitionBy("numerocarte"),因为我不希望每张卡分区一个.它将是数百万.
之后在另一个脚本中,我读了这个SomeFile.parquet镶木地板文件并对其进行了一些操作.特别是我正在运行window function它,其中分区是在镶木地板文件被重新分区的同一列上完成的.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
Run Code Online (Sandbox Code Playgroud)
在read我看到repartition按预期工作后,DataFrame df2有42个分区,每个分区都有不同的卡.
问题:
df2是按列分区的numerocarte?是否可以将多个窗口函数应用于同一分区?(如果我没有使用正确的词汇,请纠正我)
例如,你可以做到
SELECT name, first_value() over (partition by name order by date) from table1
Run Code Online (Sandbox Code Playgroud)
但有没有办法做一些事情:
SELECT name, (first_value() as f, last_value() as l (partition by name order by date)) from table1
Run Code Online (Sandbox Code Playgroud)
我们在同一个窗口上应用两个函数的位置?
参考:http: //postgresql.ro/docs/8.4/static/tutorial-window.html
使用PostgreSQL 9.0.
比方说,我有一个包含字段的表:company,profession和year.我想返回一个包含独特公司和专业的结果,但基于数字序列聚合(到一个数组中很好)年份:
示例表:
+-----------------------------+
| company | profession | year |
+---------+------------+------+
| Google | Programmer | 2000 |
| Google | Sales | 2000 |
| Google | Sales | 2001 |
| Google | Sales | 2002 |
| Google | Sales | 2004 |
| Mozilla | Sales | 2002 |
+-----------------------------+
Run Code Online (Sandbox Code Playgroud)
我对一个输出类似于以下行的查询感兴趣:
+-----------------------------------------+
| company | profession | year |
+---------+------------+------------------+
| Google | Programmer | [2000] |
| …Run Code Online (Sandbox Code Playgroud) 我有一个PostgreSQL 9.1数据库,其中包含一个包含时间戳和测量值的表
'2012-10-25 01:00' 2
'2012-10-25 02:00' 5
'2012-10-25 03:00' 12
'2012-10-25 04:00' 7
'2012-10-25 05:00' 1
... ...
Run Code Online (Sandbox Code Playgroud)
我需要在每小时8小时的范围内平均值.换句话说,我需要平均1h-8h,2h-9h,3h-10h等.
我不知道如何进行这样的查询.我到处寻找,但也不知道要寻找什么功能.
我发现的关闭是每小时/每日平均值或平均值(例如1小时-8小时,9小时-16小时等).但在这些情况下,时间戳只是使用date_trunc()函数转换(如下例所示),这对我没用.
我认为我正在寻找的是与此类似的功能
SELECT date_trunc('day', timestamp), max(value)
FROM table_name
GROUP BY date_trunc('day', timestamp);
Run Code Online (Sandbox Code Playgroud)
但是在分组条款中每小时使用一些8小时的范围.这甚至可能吗?
window-functions ×10
sql ×6
postgresql ×5
apache-spark ×4
partitioning ×2
pyspark ×2
datetime ×1
greenplum ×1
plpgsql ×1
scala ×1