suv*_*ayu 7 python ipc apache-arrow pyarrow
我不知道从哪里开始,所以寻找一些指导。我正在寻找一种在一个进程中创建一些数组/表,并可以从另一个进程访问(只读)的方法。
所以我创建了一个pyarrow.Table这样的:
a1 = pa.array(list(range(3)))
a2 = pa.array(["foo", "bar", "baz"])
a1
# <pyarrow.lib.Int64Array object at 0x7fd7c4510100>
# [
# 0,
# 1,
# 2
# ]
a2
# <pyarrow.lib.StringArray object at 0x7fd7c5d6fa00>
# [
# "foo",
# "bar",
# "baz"
# ]
tbl = pa.Table.from_arrays([a1, a2], names=["num", "name"])
tbl
# pyarrow.Table
# num: int64
# name: string
# ----
# num: [[0,1,2]]
# name: [["foo","bar","baz"]]
Run Code Online (Sandbox Code Playgroud)
现在我如何从不同的进程中读取它?我以为我会使用multiprocessing.shared_memory.SharedMemory,但这不太有效:
shm = shared_memory.SharedMemory(name='pa_test', create=True, size=tbl.nbytes)
with pa.ipc.new_stream(shm.buf, tbl.schema) as out:
for batch in tbl.to_batches():
out.write(batch)
# TypeError: Unable to read from object of type: <class 'memoryview'>
Run Code Online (Sandbox Code Playgroud)
我需要用shm.buf什么东西包裹它吗?
即使我让它工作,它看起来也很繁琐。我如何以稳健的方式做到这一点?我需要像 zmq 这样的东西吗?
我不清楚这是如何零复制的。当我写记录批次时,这不是序列化吗?我缺少什么?
在我的实际用例中,我还想与 Julia 交谈,但也许当我谈到这个问题时,这应该是一个单独的问题。
PS:我已经阅读了文档,它没有为我澄清这部分。
我需要用什么东西包裹 shm.buf 吗?
是的,你可以用pa.py_buffer()它来包装它:
size = calculate_ipc_size(table)
shm = shared_memory.SharedMemory(create=True, name=name, size=size)
stream = pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf))
with pa.RecordBatchStreamWriter(stream, table.schema) as writer:
writer.write_table(table)
Run Code Online (Sandbox Code Playgroud)
另外,size您需要计算 IPC 输出的大小,该大小可能比Table.nbytes. 您可以使用的函数是:
def calculate_ipc_size(table: pa.Table) -> int:
sink = pa.MockOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.size()
Run Code Online (Sandbox Code Playgroud)
我如何以稳健的方式做到这一点?
还不确定这部分。根据我的经验,原始进程需要在其他进程重用缓冲区时保持活动状态,但可能有一种方法可以解决这个问题。这可能与 CPython 中的此错误有关:https ://bugs.python.org/issue38119
我不清楚这是如何零复制的。当我写记录批次时,这不是序列化吗?我缺少什么?
您是正确的,将Arrow 数据写入 IPC 缓冲区确实涉及副本。零拷贝部分是当其他进程从共享内存读取数据时。Arrow 表的列将引用 IPC 缓冲区的相关段,而不是副本。
| 归档时间: |
|
| 查看次数: |
1050 次 |
| 最近记录: |