如何从多个进程(可能来自不同的语言)使用 Apache Arrow IPC?

suv*_*ayu 7 python ipc apache-arrow pyarrow

我不知道从哪里开始,所以寻找一些指导。我正在寻找一种在一个进程中创建一些数组/表,并可以从另一个进程访问(只读)的方法。

所以我创建了一个pyarrow.Table这样的:

a1 = pa.array(list(range(3)))
a2 = pa.array(["foo", "bar", "baz"])

a1
# <pyarrow.lib.Int64Array object at 0x7fd7c4510100>
# [
#   0,
#   1,
#   2
# ]

a2
# <pyarrow.lib.StringArray object at 0x7fd7c5d6fa00>
# [
#   "foo",
#   "bar",
#   "baz"
# ]

tbl = pa.Table.from_arrays([a1, a2], names=["num", "name"])

tbl
# pyarrow.Table
# num: int64
# name: string
# ----
# num: [[0,1,2]]
# name: [["foo","bar","baz"]]
Run Code Online (Sandbox Code Playgroud)

现在我如何从不同的进程中读取它?我以为我会使用multiprocessing.shared_memory.SharedMemory,但这不太有效:

shm = shared_memory.SharedMemory(name='pa_test', create=True, size=tbl.nbytes)
with pa.ipc.new_stream(shm.buf, tbl.schema) as out:
    for batch in tbl.to_batches():
        out.write(batch)

# TypeError: Unable to read from object of type: <class 'memoryview'>
Run Code Online (Sandbox Code Playgroud)

我需要用shm.buf什么东西包裹它吗?

即使我让它工作,它看起来也很繁琐。我如何以稳健的方式做到这一点?我需要像 zmq 这样的东西吗?

我不清楚这是如何零复制的。当我写记录批次时,这不是序列化吗?我缺少什么?

在我的实际用例中,我还想与 Julia 交谈,但也许当我谈到这个问题时,这应该是一个单独的问题。

PS:我已经阅读了文档,它没有为我澄清这部分。

Wil*_*nes 7

我需要用什么东西包裹 shm.buf 吗?

是的,你可以用pa.py_buffer()它来包装它:

size = calculate_ipc_size(table)
shm = shared_memory.SharedMemory(create=True, name=name, size=size)

stream = pa.FixedSizeBufferWriter(pa.py_buffer(shm.buf))
with pa.RecordBatchStreamWriter(stream, table.schema) as writer:
   writer.write_table(table)
Run Code Online (Sandbox Code Playgroud)

另外,size您需要计算 IPC 输出的大小,该大小可能比Table.nbytes. 您可以使用的函数是:

def calculate_ipc_size(table: pa.Table) -> int:
    sink = pa.MockOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.size()
Run Code Online (Sandbox Code Playgroud)

我如何以稳健的方式做到这一点?

还不确定这部分。根据我的经验,原始进程需要在其他进程重用缓冲区时保持活动状态,但可能有一种方法可以解决这个问题。这可能与 CPython 中的此错误有关:https ://bugs.python.org/issue38119

我不清楚这是如何零复制的。当我写记录批次时,这不是序列化吗?我缺少什么?

您是正确的,Arrow 数据写入 IPC 缓冲区确实涉及副本。零拷贝部分是当其他进程从共享内存读取数据时。Arrow 表的列将引用 IPC 缓冲区的相关段,而不是副本。

  • 你确实序列化了两次——算是吧。但由于“calculate_ipc_size”正在写入“MockOutputStream”,所以第一次速度非常快。实际上没有写入任何字节,它只是计算字节数。在我的机器上,我每秒能够向共享内存中写入大约 10 GB 的数据。 (2认同)