实用程序
def exec_multiprocessing(self, method, args):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = pool.map(method, args)
return results
Run Code Online (Sandbox Code Playgroud)
clone.py
def clone_vm(self, name, first_run, host, ip):
# clone stuff
Run Code Online (Sandbox Code Playgroud)
invoke.py
exec_args = [(name, first_run, host, ip) for host, ip in zip(hosts, ips)]
results = self.util.exec_multiprocessing(self.clone.clone_vm, exec_args)
Run Code Online (Sandbox Code Playgroud)
上面的代码给出了酸洗错误。我发现这是因为我们正在传递实例方法。因此,我们应该解开实例方法。但是我无法使其工作。
注意:我无法创建顶级方法来避免这种情况。我必须使用实例方法。
所以,RQ明确指出我可以在这里排队一个对象的实例方法,所以我一直试图这样做,但得到一个PicklingError:
q.enqueue(some_obj.some_func, some_data)
*** PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)
真的,我只需要在我的方法中访问SQL连接,所以我试着让它成为一个明确接受SQL连接的函数.那也失败了:
q.enqueue(some_func, sql_sess, some_data)
*** PicklingError: Can't pickle <class 'sqlalchemy.orm.session.Session'>: it's not the same object as sqlalchemy.orm.session.Session
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?我做错了什么,或者图书馆坏了吗?
我想在 Python 中使用多重处理,以便及时处理高计算成本函数k并将返回的结果收集在列表中。让我展示我的功能
def _heavy_func(value):
a, b = 0, 1
for item in range(value):
a, b = b, a + b
import time
time.sleep(1.3)
return a
Run Code Online (Sandbox Code Playgroud)
然后,我调用_heavy_func以显示非并行方式
In [1]: print [ _square_and_offset(i) for i in range(12)]
Run Code Online (Sandbox Code Playgroud)
其性能为
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
IPython CPU timings (estimated):
User : 13.01 s.
System : 0.00 s.
Wall time: 13.01 s.
Run Code Online (Sandbox Code Playgroud)
现在我已经将面向装饰器的并行性融入到我的函数中,例如
from functools import wraps
from multiprocessing import Pool
def parallel_decor(n_procs=None):
def _parallel_decor(function): …Run Code Online (Sandbox Code Playgroud) 使用多处理包时,我在单元测试中遇到此错误。
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
运行以下代码可以在常规 python 文件中运行,但是当放入 unittest 中时,它会引发我上面发布的酸洗错误。这是我可以举出的最简单的例子。
# The following, works.
import multiprocessing
def hello(number):
print "hello"
number_processes = 2
pool = multiprocessing.Pool(number_processes)
total_tasks = 2
tasks = range(total_tasks)
results = pool.map(hello, tasks)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
下一个代码块:
# This does not work, i'm running it via unittest runner
import multiprocessing
import unittest
class Testing123(unittest.TestCase):
test_1(self):
def hello(number):
print "hello"
number_processes = 2
pool = multiprocessing.Pool(number_processes)
total_tasks = 2
tasks = range(total_tasks) …Run Code Online (Sandbox Code Playgroud) windows 下的 python2.7 | 我将 mysql 连接添加到类中并使用多重处理,引发错误。
self.ispop 和 self.match_var 返回字典
sprawn_self_calcu() 和 unwrap_self_f() 是 Map_class 函数的代理
Map_class的函数需要self var。
我的代码是这样的:
from analysis_conf.pop_config import pop_config
import datetime
import multiprocessing
from functools import partial
from sqlalchemy import create_engine
from multiprocessing import Pool as threadpool
def sprawn_self_calcu(arg, **kwarg):
return Map.mapCin(*arg, **kwarg)
def unwrap_self_f(arg, **kwarg):
return Map.mappalg(*arg, **kwarg)
partial_unwrap = partial(unwrap_self_f)
partial_sprawn = partial(sprawn_self_calcu)
class Map:
def __init__(self):
self.ispop = pop_config()
self.match_var = self.ispop.pop_match_var()
def CreateSqlalchemyEngine(self,config):
sigma = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=utf8'%(config['user'],config['passwd'],
config['ipaddr'],config['port'],config['dbname']
)
return create_engine(sigma,pool_recycle=10,pool_timeout=10800)
def …Run Code Online (Sandbox Code Playgroud) 我的原始问题是我正在尝试执行以下操作:
def submit_decoder_process(decoder, input_line):
decoder.process_line(input_line)
return decoder
self.pool = Pool(processes=num_of_processes)
self.pool.apply_async(submit_decoder_process, [decoder, input_line]).get()
Run Code Online (Sandbox Code Playgroud)
解码器在这里有点介绍,但重要的是解码器是一个用调用setParseAction()的PyParsing表达式初始化的对象.这会使多处理使用的pickle失败,这反过来又失败了上面的代码.
现在,这是我已经孤立和简化的pickle/PyParsing问题.由于pickle失败,以下代码会产生错误消息.
import pickle
from pyparsing import *
def my_pa_func():
pass
pickle.dumps(Word(nums).setParseAction(my_pa_func))
Run Code Online (Sandbox Code Playgroud)
错误信息:
pickle.PicklingError: Can't pickle <function wrapper at 0x00000000026534A8>: it's not found as pyparsing.wrapper
Run Code Online (Sandbox Code Playgroud)
现在,如果你删除调用.setParseAction(my_pa_func),它将没有问题:
pickle.dumps(Word(nums))
Run Code Online (Sandbox Code Playgroud)
我怎么能绕过它呢?多处理使用泡菜,所以我猜不出它.据说使用dill的pathos包不够成熟,至少,我在Windows-64bit上安装它时遇到问题.我真的在这里摸不着头脑.
python pickle pyparsing python-multithreading python-multiprocessing
我正在尝试创建一个sklearn.compose.ColumnTransformer用于转换分类和连续输入数据的管道:
import pandas as pd
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_transformer
from sklearn.impute import SimpleImputer
df = pd.DataFrame(
{
'a': [1, 'a', 1, np.nan, 'b'],
'b': [1, 2, 3, 4, 5],
'c': list('abcde'),
'd': list('aaabb'),
'e': [0, 1, 1, 0, 1],
}
)
for col in df.select_dtypes('object'):
df[col] = df[col].astype(str)
categorical_columns = list('acd')
continuous_columns = list('be')
categorical_transformer = OneHotEncoder(sparse=False, handle_unknown='ignore')
continuous_transformer = 'passthrough'
column_transformer = …Run Code Online (Sandbox Code Playgroud) 无论如何在没有for循环的情况下使用一个固定参数的列表上的用户映射?例如
def addx(x, y):
return x + y
print map(addx, 10, [10,20])
Run Code Online (Sandbox Code Playgroud)
输出应为20和30
谢谢