如何使用pyarrow将Pandas数据帧设置/获取到Redis中

Mer*_*lin 5 python redis pandas pyarrow py-redis

使用

dd = {'ID': ['H576','H577','H578','H600', 'H700'],
      'CD': ['AAAAAAA', 'BBBBB', 'CCCCCC','DDDDDD', 'EEEEEEE']}
df = pd.DataFrame(dd)
Run Code Online (Sandbox Code Playgroud)

在Pandas 0.25之前,这在下面起作用。

set:  redisConn.set("key", df.to_msgpack(compress='zlib'))
get:  pd.read_msgpack(redisConn.get("key"))
Run Code Online (Sandbox Code Playgroud)

现在,已弃用警告。

FutureWarning: to_msgpack is deprecated and will be removed in a future version.
It is recommended to use pyarrow for on-the-wire transmission of pandas objects.

The read_msgpack is deprecated and will be removed in a future version.
It is recommended to use pyarrow for on-the-wire transmission of pandas objects.
Run Code Online (Sandbox Code Playgroud)

罂粟如何运作?而且,我如何使pyarrow对象进出Redis。

参考: 如何在Redis中设置pandas.DataFrame或从Redis获取pandas.DataFrame?

sha*_*adi 39

这是使用 pyarrow 序列化 Pandas 数据帧以存储在 redis 中的完整示例

apt-get install python3 python3-pip redis-server
pip3 install pandas pyarrow redis
Run Code Online (Sandbox Code Playgroud)

然后在python中

import pandas as pd
import pyarrow as pa
import redis

df=pd.DataFrame({'A':[1,2,3]})
r = redis.Redis(host='localhost', port=6379, db=0)

context = pa.default_serialization_context()
r.set("key", context.serialize(df).to_buffer().to_pybytes())
context.deserialize(r.get("key"))
   A
0  1
1  2
2  3
Run Code Online (Sandbox Code Playgroud)

我刚刚向Pandas提交了PR 28494,以在文档中包含这个 pyarrow 示例。

参考文档:

  • 这真的很好。我假设防御性程序员应该在推送到 Redis 之前检查数据帧的大小,因为据我所知,512MB 的限制仍然存在。https://github.com/antirez/redis/issues/757 (4认同)
  • @BrifordWylie:我使用 `bz2` 包来压缩数据,然后将其推送到 Redis。 (2认同)

ros*_*sco 8

如果您想压缩 Redis 中的数据,可以使用对 parquet 和 gzip 的内置支持

def openRedisCon():
   pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=0)
   r = redis.Redis(connection_pool=pool)
   return r

def storeDFInRedis(alias, df):
    """Store the dataframe object in Redis
    """

    buffer = io.BytesIO()
    df.to_parquet(buffer, compression='gzip')
    buffer.seek(0) # re-set the pointer to the beginning after reading
    r = openRedisCon()
    res = r.set(alias,buffer.read())

def loadDFFromRedis(alias, useStale: bool = False):
    """Load the named key from Redis into a DataFrame and return the DF object
    """

    r = openRedisCon()

    try:
        buffer = io.BytesIO(r.get(alias))
        buffer.seek(0)
        df = pd.read_parquet(buffer)
        return df
    except:
        return None


Run Code Online (Sandbox Code Playgroud)


小智 6

这是我的方法,因为 default_serialization_context 已被弃用,事情变得更简单了:

import pyarrow as pa
import redis

pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)

def storeInRedis(alias, df):
    df_compressed = pa.serialize(df).to_buffer().to_pybytes()
    res = r.set(alias,df_compressed)
    if res == True:
        print(f'{alias} cached')

def loadFromRedis(alias):
    data = r.get(alias)
    try:
        return pa.deserialize(data)
    except:
        print("No data")


storeInRedis('locations', locdf)

loadFromRedis('locations')
Run Code Online (Sandbox Code Playgroud)

  • 看起来 pyarrow 在 2.0.0 中弃用了这个 https://arrow.apache.org/blog/2020/10/22/2.0.0-release/ (3认同)