我想用python的多处理模块在不同的子进程之间共享几个numpy数组.我希望这些数组可以单独锁定,我希望在运行时动态确定数组的数量.这可能吗?
在这个答案中,JF Sebastian提出了一种在多处理过程中在共享内存中使用python的numpy数组的好方法.该数组是可锁定的,这就是我想要的.我想做一些非常相似的事情,除了可变数量的共享数组.数组的数量将在运行时确定.他的示例代码非常清晰,几乎完全符合我的要求,但我不清楚如何声明可变数量的此类数组,而不给每个数组一个硬编码的名称,如shared_arr_1,shared_arr_2等等.这样做的正确方法是什么?
我使用MPI(mpi4py)脚本(在单个节点上),该脚本可用于非常大的对象。为了让所有进程都可以访问该对象,我通过分发了该对象comm.bcast()。这会将对象复制到所有进程,并占用大量内存,尤其是在复制过程中。因此,我想共享诸如指针之类的东西,而不是对象本身。我发现一些memoryview有用的功能有助于增强流程中对象的工作。同样,对象的实际内存地址也可以通过memoryview对象字符串表示形式进行访问,并且可以这样分配:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank:
content_pointer = comm.bcast(root = 0)
print(rank, content_pointer)
else:
content = ''.join(['a' for i in range(100000000)]).encode()
mv = memoryview(content)
print(mv)
comm.bcast(str(mv).split()[-1][: -1], root = 0)
Run Code Online (Sandbox Code Playgroud)
打印:
<memory at 0x7f362a405048>
1 0x7f362a405048
2 0x7f362a405048
...
Run Code Online (Sandbox Code Playgroud)
这就是为什么我认为必须有一种方法可以在另一个过程中重构对象。但是,我在文档中找不到有关如何执行此操作的线索。
简而言之,我的问题是:是否可以在中同一节点上的进程之间共享对象mpi4py?
我想在多个进程之间共享numpy数组.有工作的解决方案在这里.但是它们都通过继承将数组传递给子进程,这对我来说不起作用,因为我必须事先启动一些工作进程,而且我不知道稍后我要处理多少个数组.有没有办法在进程启动后创建这样的数组并通过队列将这些数组传递给进程?
顺便说一下,我无法使用multiprocessing.Manager.
考虑使用get_user_pages(或get_page)映射来自调用进程的页面的 Linux 驱动程序。然后将页面的物理地址传递给硬件设备。进程和设备都可以读取和写入页面,直到双方决定结束通信。特别地,在调用get_user_pages返回的系统调用之后,通信可以继续使用页面。系统调用实际上是在进程和硬件设备之间建立一个共享内存区域。
我担心如果进程调用会发生什么fork(它可能来自另一个线程,并且可能在调用的系统调用get_user_pages正在进行中或稍后发生)。特别是,如果父级在fork后写入共享内存区域,我对底层物理地址了解多少(可能是因为copy-on-write而改变)?我想明白:
进程需要遵守哪些限制,以便我们的驱动程序的功能正常工作(即物理内存保持映射到父进程中的相同地址)。
exec几乎立即调用)来工作的常见情况。madvisewith MADV_DONTFORK,并且可以让内存从子进程的空间中消失,但它不适用于堆栈分配的缓冲区。我愿意被指出文档或源代码。我特别查看了Linux Device Drivers,但没有发现这个问题得到解决。即使只是应用于内核源代码的相关部分的 RTFS 也有点让人不知所措。
内核版本不是完全固定的,而是最近的版本(比如 ??2.6.26)。如果重要的话,我们只针对 Arm 平台(目前是单处理器,但多核即将到来)。
我想就数据共享的设计实现征求您的意见。
我正在 Linux 嵌入式设备(mips 200 Mhz)上工作,我希望在多个进程之间进行某种数据共享,这些进程可以一次读取或写入多个参数。
该数据包含大约 200 个每秒更新的字符串参数。进程可以在 1 秒内访问大约 10 次数据。
我非常想尝试使设计高效(CPU / Mem)。
此数据不需要是持久的,每次重新启动都会重新创建。
目前,我正在考虑两种选择:
对于任一选项,我将提供一个 C 接口库,它将执行数据库操作的所有逻辑。
对于 SHM,这意味着锁定/解锁信号量并访问可以称为索引数组的参数。
对于 SQLite,我的库将是 SQLite 接口库的包装器,因此该过程不必知道 SQL 语法,(应该对查询和回复进行一些解析)。
我相信共享内存效率更高:
无需使用和解析SQL,以数组的形式访问。
话虽如此,使用 SQLite 也有一些优点:已经可以工作和调试(DB 级别)。增加灵活性。在许多嵌入式系统中广泛使用。
进入正题,
性能方面,我没有使用 SQLite 的经验,如果您能分享您的意见和经验,我将不胜感激。
谢谢
我目前正在使用隔离将查询并行发送到数据库服务器。我有一个连接器对象来建立到数据库的连接,我想在所有隔离中共享它,所以我不必为每个隔离创建单独的连接。
到目前为止,我似乎只能在隔离之间共享特殊的可序列化对象。我正在使用发送和接收端口进行消息传递。对于其他对象(例如我的连接器对象),dart-vm 产生错误:
Illegal argument(s): Illegal argument in isolate message : (object extends NativeWrapper)
Run Code Online (Sandbox Code Playgroud)
您知道在多个隔离之间共享通用对象实例的任何方法吗?还是我必须为每个隔离创建一个单独的实例?
谢谢!
佩德罗
我正在尝试使用我在网上找到的示例和文档来创建共享内存区域。我的目标是 IPC ,所以我可以让不同的进程相互通信。
这是我的 C 文件
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
int main (int argc, char *argv[])
{
struct stat sb;
off_t len;
char *p;
int fd;
fd = shm_open("test", O_RDWR | O_CREAT); //,S_IRUSR | S_IWUSR);
if (fd == -1) {
perror("open");
return 1;
}
if (fstat(fd, &sb)==-1){
perror("fstat");
return 1;
}
/*if (!S_ISREG(sb.st_mode)){
fprintf(stderr, "%s is not a file\n",fileName);
return 1;
}*/
p = mmap(0, sb.st_size, PROT_WRITE, MAP_SHARED, fd, 0); …Run Code Online (Sandbox Code Playgroud) 我正在使用 ftok/shmget/shmat/shmdt 函数在 Linux 上创建、写入和读取共享段。
如果我在一个程序中写入该段然后退出,然后稍后从另一个程序读取该段,我会惊讶地发现数据仍然存在。
我原以为当共享一个段的最后一个进程执行 shmdt 时,该段将被释放。
我可以依赖这种行为吗?还是类似于在 free() 之后继续使用指针?
我有一个大约 200MB 的泡菜对象(分类器)。我尝试使用静态成员变量来初始化对象并运行 celery 任务。Celery 创建了大约 8 个新进程,每个进程将对象读入内存并冻结我的计算机。是否有任何共享对象解决方案或解决此问题的任何解决方案。
我正在尝试使用此对象对文章进行分类,并将其保存在 db 中作为后台任务。
编辑。这是我的分类器任务的代码
from model import Article
import cPickle as pkl
from classifierclasses import Remover, Classifier, ItemSelector, PreProcessor
from celery import Celery
app = Celery('tasks', backend='redis://localhost/')
class ClassiferTask(app.Task):
classifier = pkl.load(open('clf.p', 'rb')) #loading classifier object initially (huge file)
def classify(self, article_id, text):
self.prediction = ClassiferTask.classifier.predict([text])[0]
Article.update(prediction=self.prediction, is_analyzed=1).where(Article.id == article_id).execute()
def run(self, id, *args, **kwargs):
self.classify(id, args[0])
Run Code Online (Sandbox Code Playgroud)
当我将此作业提交给 celery 时,它创建了新进程,每个进程都试图读入内存并占用大内存 ~200MB * 8
我怎么解决这个问题?这个问题有没有更好的解决办法。是否有可以共享单个(只读)对象的 celery 任务。
每次我遇到诸如“进程 0 执行x任务”之类的内容时,我都倾向于认为它们是指处理器。
After reading a bit more about it, I find that there are two memory classifications, shared memory and distributed memory: A shared memory executes something like a thread (implying same data is available to all processors- hence it makes sense to call it a process) However, even for distributed memory it is called a process instead of a processor. For example: "Process 0 is computing the partial dot product" …
shared-memory ×10
python ×3
c ×2
mpi ×2
numpy ×2
c++ ×1
celery ×1
dart ×1
fork ×1
ipc ×1
linux ×1
linux-kernel ×1
macos ×1
memoryview ×1
mmap ×1
mpi4py ×1
openmp ×1
pickle ×1
python-3.x ×1
scikit-learn ×1
sqlite ×1