使用 Streamz 从 pandas DataFrame 进行流式传输

psy*_*dia 5 pandas holoviews hvplot holoviz

和包一起工作streamzhvplot为使用 pandas 数据帧绘制流数据提供支持。

例如,该streamz包有一个用于创建随机流数据帧的便利实用程序:

import hvplot.streamz
from streamz.dataframe import Random

sdf = Random(interval='200ms', freq='50ms')
sdf
# Stop the streaming with: sdf.stop()
Run Code Online (Sandbox Code Playgroud)

可以使用以下命令在流图表中简单地绘制此图hvplot

sdf.hvplot()
Run Code Online (Sandbox Code Playgroud)

是否有一种简单的方法可以从预先存在的pandas数据帧中传输数据?

例如,我希望能够这样说:

import pandas as pd

df=pd.DataFrame({'a':range(0,100),'b':range(5,105)})

sdf = StreamingDataFrame(df, interval='200ms', freq='50ms')

Run Code Online (Sandbox Code Playgroud)

然后,我可以简单地使用预先存在的pandas数据帧中的示例数据,而不是使用随机示例数据。

psy*_*dia 2

据我所知,这是...

from streamz import Stream
from streamz.dataframe import DataFrame

from time import sleep

from datetime import datetime

#Set up a source stream
source = Stream()

#Create a sample pandas dataframe
samples = pd.DataFrame({'x':[0],'y':[0]})

#The streaming dataframe takes the source stream and sample pandas dataframe
#The sample defines the dataframe schema, maybe?
sdf = DataFrame(source, example=samples)

def stest(r):
    print(datetime.now())
    print(r)

#I don't recall what this does
#I think what I was looking to do was display the last 3 items...?
# ...which this doesn't appear to do!
df = sdf.window(3).full()

#This seems to set a callback on stest when a stream element appears
df.stream.sink(stest)


for i in range(5):

    #pull the next item in the streaming dataframe into the stream
    #We could iloc on an existing dataframe?
    source.emit(pd.DataFrame({'x': [i,i,i], 'y' :[i,i,i]}))

    #Pause for a short while...
    sleep(2)


--------
2020-01-27 19:13:06.816315
   x  y
0  0  0
1  0  0
2  0  0
2020-01-27 19:13:08.824016
   x  y
0  1  1
1  1  1
2  1  1
2020-01-27 19:13:10.829178
   x  y
0  2  2
1  2  2
2  2  2
2020-01-27 19:13:12.835948
   x  y
0  3  3
1  3  3
2  3  3
2020-01-27 19:13:14.843432
   x  y
0  4  4
1  4  4
2  4  4
Run Code Online (Sandbox Code Playgroud)

....

啊..找到了一个似乎效果更好的例子。

设置我们想要流式传输的数据帧:

import pandas as pd
from time import sleep
from datetime import datetime

from streamz import Stream
from streamz.dataframe import DataFrame


#Set up a dummy dataframe
ddf=pd.DataFrame({'a':range(1,5),'b':range(11,15)})
print(ddf)

---
   a   b
0  1  11
1  2  12
2  3  13
3  4  14
Run Code Online (Sandbox Code Playgroud)

并流式传输...

#Create a stream source
source = Stream()

#Create a dataframe model for the stream
samples = pd.DataFrame({'a':[0],'b':[0]})

#Creating the streaming dataframe
sdf = DataFrame(source, example=samples)

#That window thing again...
df = sdf.window(4).full()

#FUnction to run when an item appears on the stream
def stest(r):
    print(datetime.now())
    print(r)

#Add the callback function to streamer
df.stream.sink(stest)

#This does stream from the dataframe
for i in range(len(ddf)):
    source.emit(ddf.iloc[i])
    sleep(2)

df
---
2020-01-27 19:28:05.503123
a     1
b    11
Name: 0, dtype: int64
2020-01-27 19:28:07.505536
a     1
b    11
a     2
b    12
dtype: int64
2020-01-27 19:28:09.508925
a     2
b    12
a     3
b    13
dtype: int64
2020-01-27 19:28:11.514117
a     3
b    13
a     4
b    14
dtype: int64
Run Code Online (Sandbox Code Playgroud)

我还找到了另一种将数据传输到全息图中的方法:

from streamz import Stream
from streamz.dataframe import DataFrame

from time import sleep

from datetime import datetime

#Set up a source stream
source = Stream()

#Create a sample pandas dataframe
samples = pd.DataFrame({'x':[0],'y':[0]})

#The streaming dataframe takes the source stream and sample pandas dataframe
#The sample defines the dataframe schema, maybe?
sdf = DataFrame(source, example=samples)

def stest(r):
    print(datetime.now())
    print(r)

#I don't recall what this does
#I think what I was looking to do was display the last 3 items...?
# ...which this doesn't appear to do!
df = sdf.window(3).full()

#This seems to set a callback on stest when a stream element appears
df.stream.sink(stest)


for i in range(5):

    #pull the next item in the streaming dataframe into the stream
    #We could iloc on an existing dataframe?
    source.emit(pd.DataFrame({'x': [i,i,i], 'y' :[i,i,i]}))

    #Pause for a short while...
    sleep(2)


--------
2020-01-27 19:13:06.816315
   x  y
0  0  0
1  0  0
2  0  0
2020-01-27 19:13:08.824016
   x  y
0  1  1
1  1  1
2  1  1
2020-01-27 19:13:10.829178
   x  y
0  2  2
1  2  2
2  2  2
2020-01-27 19:13:12.835948
   x  y
0  3  3
1  3  3
2  3  3
2020-01-27 19:13:14.843432
   x  y
0  4  4
1  4  4
2  4  4
Run Code Online (Sandbox Code Playgroud)

然后停止它:(cbdf.stop()只是当我刚刚尝试时这似乎对我不起作用......)

我似乎还没有为自己提供将streamz组件连接到图表的示例(除非 Holoviews 在streamz下面使用?)