小编Sir*_*pat的帖子

如何删除已发布的主题

我是 MQTT 协议的新手。当我通读文档时,我看不到任何删除已发布主题的功能。我的目的是允许发布者删除已发布的主题。我错过了 mqtt 文档中的某些内容吗?有什么建议吗?谢谢 !

python publisher broker mqtt iot

2
推荐指数
2
解决办法
7786
查看次数

带过滤器的pyspark-df窗口功能

我有一个包含列的数据集:id,timestamp,x,y

[id],[时间戳],[x],[y]

0,1443489380,100,1

0,1443489390,200,0

0,1443489400,300,0

0,1443489410,400,1

我定义了一个窗口规范: w = Window.partitionBy("id").orderBy("timestamp")

我想做这样的事情.创建一个新列,将当前行的x与下一行的x相加.

如果sum> = 500则设置new column = BIG else SMALL.

df = df.withColumn("newCol", 
                   when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
                   .otherwise("SMALL") )
Run Code Online (Sandbox Code Playgroud)

但是,我想在这之前过滤数据而不影响原始df.

[只有y = 1的行才会应用上面的代码]

因此,将在代码上方应用的数据仅为这两行.

0,1443489380,100,1

0,1443489410,400,1

我这样做了,但实在太糟糕了.

df2 = df.filter(df.y == 1)
df2 = df2.withColumn("newCol", 
                     when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
                     .otherwise("SMALL") )
df = df.join(df2, ["id","timestamp"], "outer")
Run Code Online (Sandbox Code Playgroud)

我想做类似的事情,但这是不可能的,因为它会导致AttributeError:'DataFrame'对象没有属性'when'

df = df.withColumn("newCol", df.filter(df.y == 1)
                   .when(df.x + lag(df.x,-1).over(w) >= 500 , "BIG")
                   .otherwise("SMALL") )
Run Code Online (Sandbox Code Playgroud)

总之,我只想对sum …

pyspark spark-dataframe pyspark-sql

1
推荐指数
1
解决办法
2860
查看次数

标签 统计

broker ×1

iot ×1

mqtt ×1

publisher ×1

pyspark ×1

pyspark-sql ×1

python ×1

spark-dataframe ×1