我有以下表结构
start|end
09:00|11:00
13:00|14:00
Run Code Online (Sandbox Code Playgroud)
我知道
SELECT ARRAY_AGG(start), ARRAY_AGG(end)
Run Code Online (Sandbox Code Playgroud)
将会导致
start|end
[09:00,13:00]|[11:00,14:00]
Run Code Online (Sandbox Code Playgroud)
但我怎样才能得到下面的结果呢?结果
[09:00,11:00,13:00,14:00]
Run Code Online (Sandbox Code Playgroud)
顺便说一句,我正在使用 Postgres
测试数据
addEmployee(EmplID, Name1, Name2, TypeOfWork, Salary, TxnDate)
addEmployee("tjb1998", "eva", "mcdowell", "ra", 55000, 20).
addEmployee("tjb1987x", "ben", "xena", "cdt", 68000, q50).
addEmployee("tjb2112", "ryoko", "hakubi", "ra", 63000, 60).
addEmployee("tjb1987", "ben", "croshaw", "cdt", 68000, 90).
addEmployee("tjb3300m", "amane", "mauna", "ma", 61000, 105).
Run Code Online (Sandbox Code Playgroud)
我想根据工作类型和特定工作类型的员工数量对员工进行分组。例如:
ra 4
cdt 2
ma 1
Run Code Online (Sandbox Code Playgroud)
下面是我正在尝试运行的查询
employee(TOW) :- addEmployee(_,_,_,TOW,_,_).
nmbrEmployeesOfSameType (N) :- N = #count { employee(TOW) }.
Run Code Online (Sandbox Code Playgroud)
请指教,我是 Clingo 的初级水平
我正在尝试将 hql 脚本转换为 pyspark。我正在努力如何在 groupby 子句之后的聚合中实现 case when 语句的总和。例如。
dataframe1 = dataframe0.groupby(col0).agg(
SUM(f.when((col1 == 'ABC' | col2 == 'XYZ'), 1).otherwise(0)))
Run Code Online (Sandbox Code Playgroud)
pyspark 中可以吗?我在执行此类语句时遇到错误。谢谢
以下是我的数据:
name id junk date time value value2
abc 1 1 1/1/2017 18:07:54 5 10
abc 1 2 1/1/2017 19:07:54 10 15
abc 2 3 2/1/2017 20:07:54 15 20
abc 2 4 2/1/2017 21:07:54 20 25
def 3 5 3/1/2017 22:07:54 25 30
def 3 6 3/1/2017 23:07:54 30 35
def 4 7 4/1/2017 12:07:54 35 40
def 4 8 4/1/2017 13:07:54 40 45
Run Code Online (Sandbox Code Playgroud)
我想根据三列删除重复项,name和id并date取第一个值。我尝试了以下命令:
data.drop_duplicates(subset=['name', 'id', 'date'],keep = 'first')
Run Code Online (Sandbox Code Playgroud)
我还想将这三列分组并取value和value2 …
我有一个混合的pd.DataFrame:
import pandas as pd
import numpy as np
df = pd.DataFrame({ 'A' : 1.,
'B' : pd.Timestamp('20130102'),
'C' : pd.Timestamp('20180101'),
'D' : np.random.rand(10),
'F' : 'foo' })
df
Out[12]:
A B C D F
0 1.0 2013-01-02 2018-01-01 0.592533 foo
1 1.0 2013-01-02 2018-01-01 0.819248 foo
2 1.0 2013-01-02 2018-01-01 0.298035 foo
3 1.0 2013-01-02 2018-01-01 0.330128 foo
4 1.0 2013-01-02 2018-01-01 0.371705 foo
5 1.0 2013-01-02 2018-01-01 0.541246 foo
6 1.0 2013-01-02 2018-01-01 0.976108 foo
7 …Run Code Online (Sandbox Code Playgroud) 我有一些 SQL 查询,它返回某些字段的总和,并且有一个与这些值关联的标志列,“Y”或“N”。
如果任何一条记录包含“Y”,是否有一个聚合函数将在此标志列上返回真/假?
我有一个清单:
lst = [('a', 1), ('b', 2), ('c', 3), ('a', 4), ('c', 5)]
Run Code Online (Sandbox Code Playgroud)
我想按元组的第一个元素进行分组并附加第二个元素:
group = {'a': [1, 4], 'b': [2], 'c': [3, 5]}
Run Code Online (Sandbox Code Playgroud)
所以我的代码如下所示:
group = dict()
for e1, e2 in lst:
if e1 in group:
group[e1].append(e2)
else:
group[e1] = [e2]
Run Code Online (Sandbox Code Playgroud)
我不喜欢这段代码的是,我在组字典中查找一个键两次,一次用于命令e1 in group,两次用于命令group[e1] = ...
如果找到键,是否有更好的方法来保留“指针”并且不必再次查看来设置该键的值?
另外,如果有更好的使用库的解决方案,请告诉我。
我正在尝试使用mapGroupsWithState方法对传入的数据流进行有状态结构化流处理。但我面临的问题是,我为groupByKey选择的键使我的状态太大太快。明显的出路是更改密钥,但我希望在更新方法中应用的业务逻辑要求密钥与我现在拥有的密钥完全相同,或者如果可能的话,访问所有密钥的GroupState 。
例如,我有来自各个组织的数据流,通常组织包含 userId、personId 等。请参阅下面的代码:
val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)
val df = statisticStream.toDF()
val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation", s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
Run Code Online (Sandbox Code Playgroud)
案例类别:
case class User(
orgId: Long,
profileId: Long,
userId: Long)
case class UserStatistic(
orgId: Long,
known: Long,
uknown: Long,
userSeq: Seq[User])
Run Code Online (Sandbox Code Playgroud)
更新方法:
def updateUserStatistic(
orgId: Long,
newEvents: Iterator[User],
oldState: GroupState[UserStatistic]): UserStatistic = { …Run Code Online (Sandbox Code Playgroud) aggregate apache-spark spark-streaming spark-structured-streaming
我遇到的情况是,列中可能有空值,需要在组中求和。
如果我在组中遇到空值,我希望该组的总和为空。但 PySpark 默认情况下似乎会忽略空行并对其余非空值求和。
例如:
dataframe = dataframe.groupBy('dataframe.product', 'dataframe.price') \
.agg(f.sum('price'))
Run Code Online (Sandbox Code Playgroud)
预期输出是:
但我得到:
aggregate ×10
python ×3
sql ×3
apache-spark ×2
pandas ×2
postgresql ×2
pyspark ×2
clingo ×1
dictionary ×1
group-by ×1
inner-join ×1
key ×1
null ×1
oracle ×1
oracle12c ×1
python-2.7 ×1
python-3.x ×1