Zac*_*she 15 python-multiprocessing aws-lambda
我有一个使用该multiprocessing
包的短代码,并在我的本地机器上正常工作.
当我上传到AWS Lambda
那里并运行时,我收到以下错误(stacktrace trimmed):
[Errno 38] Function not implemented: OSError
Traceback (most recent call last):
File "/var/task/recorder.py", line 41, in record
pool = multiprocessing.Pool(10)
File "/usr/lib64/python2.7/multiprocessing/__init__.py", line 232, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 138, in __init__
self._setup_queues()
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 234, in _setup_queues
self._inqueue = SimpleQueue()
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 354, in __init__
self._rlock = Lock()
File "/usr/lib64/python2.7/multiprocessing/synchronize.py", line 147, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1)
File "/usr/lib64/python2.7/multiprocessing/synchronize.py", line 75, in __init__
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
OSError: [Errno 38] Function not implemented
Run Code Online (Sandbox Code Playgroud)
是不是python的核心包的一部分没有实现?我不知道我在底下跑什么,所以我无法登录并进行调试.
任何想法如何multiprocessing
在Lambda上运行?
小智 10
据我所知,多处理将无法在AWS Lambda上运行,因为缺少执行环境/容器/dev/shm
- 请参阅https://forums.aws.amazon.com/thread.jspa?threadID=219962(可能需要登录) .
没有任何关于if /何时亚马逊会改变这个的话(我可以找到),所以我正在寻找其他库,例如https://pythonhosted.org/joblib/parallel.html将回退到/tmp
(我们知道DOES存在)它找不到/dev/shm
.
小智 9
您可以使用 Python 的多处理模块在 AWS Lambda 上并行运行例程,但不能使用其他答案中所述的池或队列。一个可行的解决方案是使用本文中概述的进程和管道https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/
虽然这篇文章确实帮助我找到了一个解决方案(在下面分享),但有一些事情需要注意。首先,基于进程和管道的解决方案不如 Pool 中的内置 map 函数快,尽管我确实看到了几乎线性的加速,因为我增加了 Lambda 函数中的可用内存/CPU 资源。其次,在以这种方式开发多处理功能时,必须进行相当多的管理。我怀疑这至少是我的解决方案比内置方法慢的部分原因。如果有人有加快速度的建议,我很想听听他们的意见!最后,虽然文章指出多处理对于卸载异步进程很有用,但使用多处理还有其他原因,例如大量密集的数学运算,这正是我试图做的。
编码:
# Python 3.6
from multiprocessing import Pipe, Process
def myWorkFunc(data, connection):
result = None
# Do some work and store it in result
if result:
connection.send([result])
else:
connection.send([None])
def myPipedMultiProcessFunc():
# Get number of available logical cores
plimit = multiprocessing.cpu_count()
# Setup management variables
results = []
parent_conns = []
processes = []
pcount = 0
pactive = []
i = 0
for data in iterable:
# Create the pipe for parent-child process communication
parent_conn, child_conn = Pipe()
# create the process, pass data to be operated on and connection
process = Process(target=myWorkFunc, args=(data, child_conn,))
parent_conns.append(parent_conn)
process.start()
pcount += 1
if pcount == plimit: # There is not currently room for another process
# Wait until there are results in the Pipes
finishedConns = multiprocessing.connection.wait(parent_conns)
# Collect the results and remove the connection as processing
# the connection again will lead to errors
for conn in finishedConns:
results.append(conn.recv()[0])
parent_conns.remove(conn)
# Decrement pcount so we can add a new process
pcount -= 1
# Ensure all remaining active processes have their results collected
for conn in parent_conns:
results.append(conn.recv()[0])
conn.close()
# Process results as needed
Run Code Online (Sandbox Code Playgroud)
我遇到了同样的问题。这是我之前在本地计算机上运行良好的代码:
import concurrent.futures
class Concurrent:
@staticmethod
def execute_concurrently(function, kwargs_list):
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for _, result in zip(kwargs_list, executor.map(function, kwargs_list)):
results.append(result)
return results
Run Code Online (Sandbox Code Playgroud)
我用这个替换了它:
import concurrent.futures
class Concurrent:
@staticmethod
def execute_concurrently(function, kwargs_list):
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(function, kwargs) for kwargs in kwargs_list]
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
Run Code Online (Sandbox Code Playgroud)
奇迹般有效。
取自此拉取请求
归档时间: |
|
查看次数: |
4700 次 |
最近记录: |