该multiprocessing模块的文档显示了如何将队列传递给以multiprocessing.Process.开头的进程.但是,如何与异步工作进程共享队列apply_async?我不需要动态加入或其他任何东西,只是工人(反复)将结果报告回基地的一种方式.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Run Code Online (Sandbox Code Playgroud)
这失败了:
RuntimeError: Queue objects should only be shared between processes through inheritance.我理解这意味着什么,我理解继承的建议,而不是要求pickle/unpickling(以及所有特殊的Windows限制).但如何做我传递队列中一个可行的办法?我找不到一个例子,我尝试了几种以各种方式失败的替代品.请帮忙?
python queue parallel-processing multiprocessing python-multiprocessing
两者都是用于数据分析系统的柱状(磁盘)存储格式.两者都集成在Apache Arrow(用于python的pyarrow包)中,旨在与Arrow对应作为柱状内存分析层.
两种格式有何不同?
在可能的情况下,你是否总是喜欢使用羽毛?
附录
我在这里找到了一些提示https://github.com/wesm/feather/issues/188,但考虑到这个项目的年龄,它可能有点过时了.
不是一个严肃的速度测试,因为我只是倾倒并加载一个完整的Dataframe,但如果您之前从未听说过这些格式,那么会给您一些印象:
# IPython
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.feather as feather
import pyarrow.parquet as pq
import fastparquet as fp
df = pd.DataFrame({'one': [-1, np.nan, 2.5],
'two': ['foo', 'bar', 'baz'],
'three': [True, False, True]})
print("pandas df to disk ####################################################")
print('example_feather:')
%timeit feather.write_feather(df, 'example_feather')
# 2.62 ms ± 35.8 µs per loop …Run Code Online (Sandbox Code Playgroud) 我注意到一些奇怪的行为,这些行为可能特定于也可能不是特定于我的系统。(运行 Windows 8 的联想 t430)
使用这个脚本:
import time
now = time.time()
while True:
then = now
now = time.time()
dif = now - then
print(dif)
time.sleep(0.01)
Run Code Online (Sandbox Code Playgroud)
在浏览器打开的情况下,我得到以下输出(我认为是名义上的)。
但是,如果没有打开浏览器,我会观察到严重的每个循环延迟。
显然,这是违反直觉的,因为我认为当并发进程较少时,任何人都希望获得更好的性能。
对这些结果的任何见解或简单复制将不胜感激。
编辑: 有趣的是,我观察到与此代码类似的延迟:
import time
now = time.time()
def newSleep(mark,duration):
count = 0
while time.time()-mark < duration:
count+=1
print(count)
while True:
then = now
now = time.time()
dif = now - then
print(dif)
#time.sleep(0.01)
newSleep(now,0.01)
Run Code Online (Sandbox Code Playgroud)
虽然它确实提供了额外的洞察力 - 即潜在循环的一些实例是由于缺乏处理器可用性(通过打印 0 的计数来表示) - 我仍然注意到 15ms 的行为,其中打印的计数将高达 70k.. . 和 10 毫秒的行为,计数约为 40k。
我正在阅读python 文档中对两者的描述:
产卵
父进程启动一个新的 python 解释器进程。子进程将只继承运行进程对象 run() 方法所需的资源。特别是,父进程中不必要的文件描述符和句柄将不会被继承。与使用 fork 或 forkserver 相比,使用此方法启动进程相当慢。[在 Unix 和 Windows 上可用。Windows 和 macOS 上的默认设置。]
叉子
父进程使用 os.fork() 来派生 Python 解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地分叉多线程进程是有问题的。[仅在 Unix 上可用。Unix 上的默认设置。]
我的问题是:
我努力理解之间的差异run()和start()。根据文档,run()method 调用传递给对象构造函数的可调用对象,而start()method 启动进程并且只能调用一次。
我尝试了以下示例:
def get_process_id(process_name):
print process_name, os.getpid()
p1 = multiprocessing.Process(target=get_process_id, args=('process_1',))
p2 = multiprocessing.Process(target=get_process_id, args=('process_2',))
p1.run()
p2.run()
p1.start()
p2.start()
Run Code Online (Sandbox Code Playgroud)
结果如下:
process_1 35138
process_2 35138
process_1 35141
process_2 35142
Run Code Online (Sandbox Code Playgroud)
当我使用时run(),它显示p1并p2使用相同的过程。但是当我使用时start(),他们给出了两个不同的。是不是因为调用run()与调用它的进程没有任何关系,而只是调用函数(get_process_id在这个例子中)?
我经常发现自己用Python编写程序来构造一个大的(兆字节)只读数据结构,然后使用该数据结构来分析一个非常大的(总共几百兆字节)小记录列表.每个记录都可以并行分析,因此自然模式是设置只读数据结构并将其分配给全局变量,然后创建一个multiprocessing.Pool(通过隐式将数据结构复制到每个工作进程中fork)和然后用来imap_unordered并行处理记录.这种模式的骨架看起来像这样:
classifier = None
def classify_row(row):
return classifier.classify(row)
def classify(classifier_spec, data_file):
global classifier
try:
classifier = Classifier(classifier_spec)
with open(data_file, "rt") as fp, \
multiprocessing.Pool() as pool:
rd = csv.DictReader(fp)
yield from pool.imap_unordered(classify_row, rd)
finally:
classifier = None
Run Code Online (Sandbox Code Playgroud)
我因为全局变量之间的隐式耦合的不满意这点classify和classify_row.理想情况下,我想写
def classify(classifier_spec, data_file):
classifier = Classifier(classifier_spec)
with open(data_file, "rt") as fp, \
multiprocessing.Pool() as pool:
rd = csv.DictReader(fp)
yield from pool.imap_unordered(classifier.classify, rd)
Run Code Online (Sandbox Code Playgroud)
但是这不起作用,因为Classifier对象通常包含无法pickle的对象(因为它们是由作者不关心的扩展模块定义的); 我还读过如果它确实有效会很慢,因为在每次调用绑定方法时,Classifier对象都会被复制到工作进程中.
还有更好的选择吗?我只关心3.x.
python fork multiprocessing python-3.x python-multiprocessing
在 Python 2 中有一个函数thread.interrupt_main(),KeyboardInterrupt当从子线程调用时,它会在主线程中引发异常。
这也可以_thread.interrupt_main()在 Python 3 中使用,但它是一个低级的“支持模块”,主要用于其他标准模块。
在 Python 3 中这样做的现代方法是什么,大概是通过threading模块,如果有的话?
python multithreading keyboardinterrupt python-multithreading python-3.x
语境:
concurrent.futures.process.ProcessPool来执行代码的Python 应用服务器(是的,我知道importlib.reload有警告)
为了让它工作,我想我必须在进程池管理的importlib.reload每个multiprocessing进程中执行。
有没有办法向进程池中的所有进程提交一些东西?
python parallel-processing multiprocessing python-multiprocessing process-pool
我从我的老师那里复制了一个.py文件,并将其加载到Pycharm 4.04中的一个项目中.当我试图右键单击并运行它时,它显示"在'foo.py'中运行Doctests"而不是常规的"运行'foo.py'",我无法找到如何正常运行它.
我在设置搜索"文档测试",发现什么都没有,和PyCharm文档中在这里,也没什么可说要么禁用文档测试.
我在这里发现了一个类似的问题: Pycharm不允许运行文件.仅显示运行unittest选项. 并尝试了建议的解决方案,但它仍然在上下文菜单中显示"在'foo.py'中运行doctests".
假设我有一个包含随机数的巨大列表,例如
L = [random.randrange(0,25000000000) for _ in range(1000000000)]
Run Code Online (Sandbox Code Playgroud)
我需要删除此列表中的重复项
我为包含较少元素的列表编写了此代码
def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
if i not in seen:
result.append(i)
seen.add(i)
return result
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,我创建了一个集合,这样我就可以记住哪些数字已经出现在我正在处理的列表中,如果该数字不在集合中,那么我将它添加到我需要返回的结果列表中并将其保存在设置以便它不会再次添加到结果列表中
现在,对于列表中的 1000000 个数字,一切都很好,我可以快速得到结果,但对于比 1000000000 个问题出现的数字,我需要使用机器上的不同内核来尝试解决问题,然后将多个结果结合起来流程
我的第一个猜测是让所有进程都可以访问一个集合,但会出现许多复杂情况一个进程如何读取而另一个进程正在添加到集合中,我什至不知道是否可以在我知道的进程之间共享一个集合我们可以使用队列或管道,但我不确定如何使用它
有人可以就解决这个问题的最佳方法给我建议吗?我对任何新想法持开放态度
python optimization duplicates multiprocessing python-multiprocessing
python ×10
python-3.x ×3
fork ×2
duplicates ×1
feather ×1
lag ×1
optimization ×1
pandas ×1
parquet ×1
process-pool ×1
pyarrow ×1
pycharm ×1
queue ×1
sleep ×1
spawn ×1
windows ×1