我正在尝试使用现有列集上的groupby聚合在Pyspark中创建新列表.下面提供了一个示例输入数据框:
------------------------
id | date | value
------------------------
1 |2014-01-03 | 10
1 |2014-01-04 | 5
1 |2014-01-05 | 15
1 |2014-01-06 | 20
2 |2014-02-10 | 100
2 |2014-03-11 | 500
2 |2014-04-15 | 1500
Run Code Online (Sandbox Code Playgroud)
预期的产出是:
id | value_list
------------------------
1 | [10, 5, 15, 20]
2 | [100, 500, 1500]
Run Code Online (Sandbox Code Playgroud)
列表中的值按日期排序.
我尝试使用collect_list如下:
from pyspark.sql import functions as F
ordered_df = input_df.orderBy(['id','date'],ascending = True)
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value"))
Run Code Online (Sandbox Code Playgroud)
但即使我在聚合之前按日期对输入数据框进行排序,collect_list也不保证顺序.
有人可以通过保留基于第二个(日期)变量的订单来帮助如何进行聚合吗?
我们以mpg数据集为例,特别是class和cyl列.我可以看到每个单元有多少条目class,并根据cyl值区分填充颜色:
library(ggplot2)
p <- ggplot(mpg)
p <- p + geom_bar(mapping=aes(x=class, fill=factor(cyl)), position=position_dodge())
print(p)
Run Code Online (Sandbox Code Playgroud)
但是,我想看到的是每个不同值的平均条目数(每个class)cyl.基本上,如果你看一下上面的图,我想要每个类一个单独的条,其高度应该是该类的彩色条的平均高度.
我可以通过预处理数据框来获得这个结果,例如:
df <- aggregate(formula=cyl~class, data=mpg, FUN=function(x) { length(x) / length(unique(x)) })
p <- ggplot(df)
p <- p + geom_bar(mapping=aes(x=class, y=cyl), stat='identity')
p <- p + ylab('average count')
Run Code Online (Sandbox Code Playgroud)
这给了我想要的输出:
但是,考虑到ggplot2有多强大,我想知道这是否可以通过ggplot函数实现.我想这涉及使用特定的stat(可能与group=cyl?),但我无法做到.
使用dplyr(优选地),我试图计算每个观察的组平均值,同时从该组中排除该观察结果.
看来,这应该是可行的与组合rowwise()和group_by(),但是这两种功能不能同时使用.
鉴于此数据框架:
df <- data_frame(grouping = rep(LETTERS[1:5], 3),
value = 1:15) %>%
arrange(grouping)
df
#> Source: local data frame [15 x 2]
#>
#> grouping value
#> (chr) (int)
#> 1 A 1
#> 2 A 6
#> 3 A 11
#> 4 B 2
#> 5 B 7
#> 6 B 12
#> 7 C 3
#> 8 C 8
#> 9 C 13
#> 10 D 4
#> 11 D 9 …Run Code Online (Sandbox Code Playgroud) 我有一个pandas DataFrame,其中一列包含以下元素:
[2,2.5,3,2,2.6,10,10.3,10,10.1,10.3,10],
Run Code Online (Sandbox Code Playgroud)
是否有一个python函数可以检测从该列表2.6到10的突然变化?我已经阅读了一点,R可以做到这一点.python中有类似的功能吗?
我想否定(除了因子变量的每个级别的给定行值之外的所有值)并总结剩余的数据.举个简单的例子,我有一个带有两列的数据框DF.
>DF
Category Value
A 5
B 2
C 3
A 1
C 1
Run Code Online (Sandbox Code Playgroud)
如果dplyr可以否定选择它会看起来像这样(可以吗?).
> DF %>% group_by(!Category) %>% summarise(avg = mean(Value))
!Category avg
A 2.00 #average of all rows where category isn't A
B 2.50
C 2.67
Run Code Online (Sandbox Code Playgroud) 我目前有一个数据框,其中一列的类型为"abcde ...".将此列命名为"col4"
我想通过拆分col4的元素将一行拆分成多行,保留所有其他列的值.
因此,例如,给定单行的df:
col1 [0] | col2 [0] | col3 [0] | abc |
我希望输出为:
col1 [0] | col2 [0] | col3 [0] | a |
col1 [0] | col2 [0] | col3 [0] | b |
col1 [0] | col2 [0] | col3 [0] | c |
使用split和explode函数,我尝试了以下方法:
d = COMBINED_DF.select(col1, col2, col3, explode(split(my_fun(col4), " ")))
Run Code Online (Sandbox Code Playgroud)
但是,这会产生以下输出:
col1 [0] | col2 [0] | col3 [0] | abc |
col1 [0] | col2 [0] | col3 [0] | abc …
我需要将两列Dataframe转换为按其中一列分组的列表.我在熊猫中成功完成了它:
expertsDF = expertsDF.groupby('session', as_index=False).agg(lambda x: x.tolist())
Run Code Online (Sandbox Code Playgroud)
但是现在我想在pySpark中做同样的事情如下:
expertsDF = df.groupBy('session').agg(lambda x: x.collect())
Run Code Online (Sandbox Code Playgroud)
我收到错误:
all exprs should be Column
Run Code Online (Sandbox Code Playgroud)
我已经尝试了几个命令,但我根本无法做到正确.并且spark dokumentation不包含类似的东西.
它的示例输入是数据帧:
session name
1 a
1 b
2 v
2 c
Run Code Online (Sandbox Code Playgroud)
输出:
session name
1 [a, b....]
2 [v, c....]
Run Code Online (Sandbox Code Playgroud) 我最近开始玩弄Luigi,我想知道如何使用它来不断地将新数据附加到现有的目标文件中。
想象一下,我每分钟 ping 一个 api 以检索新数据。因为 aTask仅在Target不存在时运行,所以一种天真的方法是通过当前datetime. 这是一个简单的例子:
import luigi
import datetime
class data_download(luigi.Task):
date = luigi.DateParameter(default = datetime.datetime.now())
def requires(self):
return []
def output(self):
return luigi.LocalTarget("data_test_%s.json" % self.date.strftime("%Y-%m-%d_%H:%M"))
def run(self):
data = download_data()
with self.output().open('w') as out_file:
out_file.write(data + '\n')
if __name__ == '__main__':
luigi.run()
Run Code Online (Sandbox Code Playgroud)
如果我安排这个任务每分钟运行一次,它会执行,因为当前时间的目标文件还不存在。但它每分钟创建 60 个文件。我想要做的,而不是,是确保所有在同一个文件中的新数据结束了最后。实现这一目标的可扩展方法是什么?欢迎任何想法,建议!
我正在做一个新闻推荐系统,我需要为用户和他们阅读的新闻建立一个表格。我的原始数据是这样的:
001436800277225 ["9161492","9161787","9378531"]
009092130698762 ["9394697"]
010003000431538 ["9394697","9426473","9428530"]
010156461231357 ["9350394","9414181"]
010216216021063 ["9173862","9247870"]
010720006581483 ["9018786"]
011199797794333 ["9017977","9091134","9142852","9325464","9331913"]
011337201765123 ["9161294","9198693"]
011414545455156 ["9168185","9178348","9182782","9359776"]
011425002581540 ["9083446","9161294","9309432"]
Run Code Online (Sandbox Code Playgroud)
我使用spark-SQL爆炸并进行了一次热编码,
df = getdf()
df1 = df.select('uuid',explode('news').alias('news'))
stringIndexer = StringIndexer(inputCol="news", outputCol="newsIndex")
model = stringIndexer.fit(df1)
indexed = model.transform(df1)
encoder = OneHotEncoder(inputCol="newsIndex", outputCol="newsVec")
encoded = encoder.transform(indexed)
encoded.show(20,False)
Run Code Online (Sandbox Code Playgroud)
之后,我的数据变为:
+---------------+-------+---------+----------------------+
|uuid |news |newsIndex|newsVec |
+---------------+-------+---------+----------------------+
|014324000386050|9398253|10415.0 |(105721,[10415],[1.0])|
|014324000386050|9428530|70.0 |(105721,[70],[1.0]) |
|014324000631752|654112 |1717.0 |(105721,[1717],[1.0]) |
|014324000674240|730531 |2282.0 |(105721,[2282],[1.0]) |
|014324000674240|694306 |1268.0 |(105721,[1268],[1.0]) |
|014324000674240|712016 |4766.0 |(105721,[4766],[1.0]) |
|014324000674240|672307 |7318.0 |(105721,[7318],[1.0]) |
|014324000674240|698073 |1241.0 …Run Code Online (Sandbox Code Playgroud) python machine-learning apache-spark apache-spark-sql pyspark-sql
当使用facet_grid时,如何根据数据中的因子对geom_segments进行着色?我的方法失败了,因为颜色的分配是错误的.
这是一些数据:
visual_data=data.frame(Values = 10:1, Words = c("yeah","what","is","up","and","how", "are", "things","for", "you"), group = c("a","b","a","b","a","b","a","b","a","b"), importance=c("#EF2A2A","#EF2A2A", "#E4FA11", "#E4FA11", "#E4FA11", "#E4FA11","#EF2A2A","#EF2A2A","#EF2A2A", "#E4FA11"))
Run Code Online (Sandbox Code Playgroud)
此代码创建一个图:
graphic=ggplot(visual_data, aes(xend=Values, x=0, y=reorder(Words, Values), yend=reorder(Words, Values))) +
geom_text(aes(x=Values, label=Values, hjust=-0.3), color="#389912",family="sans") +
geom_segment(size=4,colour=visual_data$importance) +
theme(axis.text=element_text(size=10,family="sans"),axis.title=element_text(size=13,face="bold",family="sans"),strip.text.y = element_text(size=12,family="sans"), plot.title=element_text(size=14,face="bold",family="sans")) +
facet_grid(group~., scales = "free")+
theme_bw()
graphic
Run Code Online (Sandbox Code Playgroud)
可以看出,"是"和"什么",例如,不共享相同的条形颜色,尽管它们应该根据我的数据规范.
有没有人解决这个问题?
python ×6
apache-spark ×4
r ×4
pyspark ×3
dataframe ×2
dplyr ×2
ggplot2 ×2
pandas ×2
group-by ×1
luigi ×1
plot ×1
pyspark-sql ×1
python-2.7 ×1