如何使用python pandas处理传入的实时数据

Mar*_* MD 57 python pandas

使用pandas处理实时传入数据的最佳/ pythonic方法是哪种?

每隔几秒钟我就会收到以下格式的数据点:

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}
Run Code Online (Sandbox Code Playgroud)

我想将它附加到现有的DataFrame,然后对其进行一些分析.

问题是,只是使用DataFrame.append追加行可能导致所有复制的性能问题.

我试过的事情:

一些人建议预先分配一个大的DataFrame并在数据进入时更新它:

In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)

In [2]: columns = ['high', 'low', 'open', 'close']

In [3]: df = pd.DataFrame(index=t, columns=columns)

In [4]: df
Out[4]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02  NaN  NaN  NaN   NaN
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

In [6]: data_ = pd.Series(data)

In [7]: df.loc[data['time']] = data_

In [8]: df
Out[8]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02    4    3    2     1
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN
Run Code Online (Sandbox Code Playgroud)

另一种选择是建立一个dicts列表.只需将传入的数据附加到列表中并将其切片为较小的DataFrame即可完成工作.

In [9]: ls = []

In [10]: for n in range(5):
   .....:     # Naive stuff ahead =)
   .....:     time = '2013-01-01 00:00:0' + str(n)
   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
   .....:     ls.append(d)

In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')

In [12]: df
Out[12]: 
                        close      high       low      open stock
time                                                             
2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH
2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH
Run Code Online (Sandbox Code Playgroud)

或类似的东西,可能会更多地处理输入.

And*_*den 16

我会使用HDF5/pytables如下:

  1. 将数据保存为"尽可能长的"python列表.
  2. 将结果附加到该列表.
  3. 当它变得"大"时:
    • 使用pandas io(以及可附加的表)推送到HDF5商店.
    • 清除清单.
  4. 重复.

实际上,我定义的函数使用每个"键"的列表,以便您可以在同一进程中将多个DataFrame存储到HDF5存储.


我们定义一个你用每一行调用的函数d:

CACHE = {}
STORE = 'store.h5'   # Note: another option is to keep the actual file open

def process_row(d, key, max_len=5000, _cache=CACHE):
    """
    Append row d to the store 'key'.

    When the number of items in the key's cache reaches max_len,
    append the list of rows to the HDF5 store and clear the list.

    """
    # keep the rows for each key separate.
    lst = _cache.setdefault(key, [])
    if len(lst) >= max_len:
        store_and_clear(lst, key)
    lst.append(d)

def store_and_clear(lst, key):
    """
    Convert key's cache list to a DataFrame and append that to HDF5.
    """
    df = pd.DataFrame(lst)
    with pd.HDFStore(STORE) as store:
        store.append(key, df)
    lst.clear()
Run Code Online (Sandbox Code Playgroud)

注意:我们使用with语句在每次写入后自动关闭存储.它可以更快地保持开放,但即便如此我们建议您定期冲洗(收盘刷新).另请注意,使用集合deque而不是列表可能更具可读性,但列表的性能在这里会稍微好一些.

要使用此功能,请调用:

process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
            key="df")
Run Code Online (Sandbox Code Playgroud)

注意:"df"是pytables商店中使用的存储密钥.

作业完成后,请确保store_and_clear剩余的缓存:

for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2
    store_and_clear(lst, k)
Run Code Online (Sandbox Code Playgroud)

现在您可以通过以下方式获得完整的DataFrame:

with pd.HDFStore(STORE) as store:
    df = store["df"]                    # other keys will be store[key]
Run Code Online (Sandbox Code Playgroud)

一些评论:

  • 5000可以调整,尝试使用一些较小/较大的数字,以满足您的需求.
  • List append是O(1),DataFrame追加是O(len(df)).
  • 在您进行统计或数据修改之前,您不需要大熊猫,使用最快的.
  • 此代码适用于多个密钥(数据点).
  • 这是非常少的代码,我们留在vanilla python列表然后pandas dataframe ...

此外,要获取最新的读取,您可以定义一个get方法,读取之前存储和清除.通过这种方式,您可以获得最新的数据:

def get_latest(key, _cache=CACHE):
    store_and_clear(_cache[key], key)
    with pd.HDFStore(STORE) as store:
        return store[key]
Run Code Online (Sandbox Code Playgroud)

现在,当您访问时:

df = get_latest("df")
Run Code Online (Sandbox Code Playgroud)

你会得到最新的"df".


另一种选择是稍微更复杂:在香草pytables定义自定义表,请参见教程.

注意:您需要知道字段名称以创建列描述符.


Bre*_*rne 9

您实际上是在尝试解决两个问题:捕获实时数据和分析数据.第一个问题可以通过专为此目的而设计的Python日志来解决.然后通过读取相同的日志文件可以解决另一个问题.

  • 记录专门用于帮助捕获时间序列数据.它将为您处理所有缓冲和文件管理.Pandas旨在有效地读取大数据文件.同时,它们可以解决您的数据管理问题.此外,您还拥有数据副本,以便日后仔细检查您的工作.这是一篇相关的帖子:http://stackoverflow.com/q/14262433/584846 (8认同)
  • 如果你完全关心数据,那么是的,使用日志来构建一个非常强大的系统,保证一切都得到保存.然后单独使用一个分析应用程序,每隔一段时间读取块中的日志,并将块附加到DataFrame.如果您不关心数据并且始终可以重新开始,那么只需将传入数据聚合为块,并一次追加100或1000行.如果你绝对需要追加行asap,至少附加1000个空行来分配内存,然后手动插入数据就像你刚刚拥有numpy数组一样. (3认同)
  • 我真的不明白为什么日志记录可以帮助捕获时间序列数据?你的意思是将时间序列存储为字符串,然后解析字符串?那为什么我们需要记录呢?我们不应该只是在它们传入的时候将它们转换成适当的数据结构吗? (2认同)
  • 有人可以解释一下这个Python日志解决方案的相对优缺点,关于另一个答案给出的HDF5/PyTables解决方案吗?由于其精确的索引功能,后者似乎对在线分析更有用.谢谢. (2认同)