流式传输数据为pandas df

Leb*_*Leb 8 python streaming pandas

我正在尝试模拟使用pandas来访问不断变化的文件.

我有一个文件读取一个csv文件,添加一行,然后休眠一段时间来模拟批量输入.

import pandas as pd
from time import sleep
import random

df2 = pd.DataFrame(data = [['test','trial']], index=None)

while True:
    df = pd.read_csv('data.csv', header=None)
    df.append(df2)
    df.to_csv('data.csv', index=False)
    sleep(random.uniform(0.025,0.3))
Run Code Online (Sandbox Code Playgroud)

第二个文件是通过输出数据帧的形状来检查数据的变化:

import pandas as pd

while True:
    df = pd.read_csv('data.csv', header=None, names=['Name','DATE'])
    print(df.shape)
Run Code Online (Sandbox Code Playgroud)

问题是当我得到DF的正确形状时,有一些时候它会输出(0x2).

即:

...
(10x2)
(10x2)
...
(10x2)
(0x2)
(11x2)
(11x2)
...
Run Code Online (Sandbox Code Playgroud)

这确实发生在一些但不是每次形状变化之间(文件添加到dataframe).

知道这种情况发生在第一个脚本打开文件以添加数据,而第二个脚本无法访问它时,因此(0x2),这会发生任何数据丢失吗?

我无法直接访问流,只能输出文件.或者还有其他可能的解决方案吗?

编辑

这样做的目的是仅加载新数据(我有一个代码可以做到这一点),并"即时"进行分析.一些分析将包括输出/秒,图形(类似于流图),以及一些其他数值计算.

最大的问题是我只能访问csv文件,我需要能够分析数据,而不会丢失或延迟.

小智 3

其中一个脚本正在读取文件,而另一个脚本正在尝试写入文件。两个脚本无法同时访问该文件。就像 Padraic Cunningham 在评论中所说的那样,您可以实现一个锁定文件来解决这个问题。

有一个 python 包可以完成名为lockfile 的任务,并附有文档。

这是您的第一个实现了锁定文件包的脚本:

import pandas as pd
from time import sleep
import random
from lockfile import FileLock

df2 = pd.DataFrame(data = [['test','trial']], index=None)
lock = FileLock('data.lock')

while True:
    with lock:
        df = pd.read_csv('data.csv', header=None)
        df.append(df2)
        df.to_csv('data.csv', index=False)
    sleep(random.uniform(0.025,0.3))
Run Code Online (Sandbox Code Playgroud)

这是实现了锁定文件包的第二个脚本:

import pandas as pd
from time import sleep
from lockfile import FileLock

lock = FileLock('data.lock')

while True:
    with lock:
        df = pd.read_csv('data.csv', header=None, names=['Name','DATE'])
    print(df.shape)
    sleep(0.100)
Run Code Online (Sandbox Code Playgroud)

我添加了 100 毫秒的等待时间,以便减慢控制台的输出速度。

这些脚本将在访问“data.csv”文件之前创建一个名为“data.lock”的文件,并在访问“data.csv”文件后删除该文件“data.lock”。在任一脚本中,如果“data.lock”存在,则脚本将等待,直到“data.lock”文件不再存在。