我有一个简单的python多处理脚本,它设置一个工作池,试图将工作输出附加到Manager列表.该脚本有3个调用堆栈: - 主调用f1,它产生几个调用另一个函数g1的工作进程.当一个人试图调试脚本时(偶然在Windows 7/64 bit/VS 2010/PyTools上),脚本会运行到嵌套的进程创建循环中,从而产生无数个进程.谁能确定原因?我确定我错过了很简单的事情.这是有问题的代码: -
import multiprocessing
import logging
manager = multiprocessing.Manager()
results = manager.list()
def g1(x):
y = x*x
print "processing: y = %s" % y
results.append(y)
def f1():
logger = multiprocessing.log_to_stderr()
logger.setLevel(multiprocessing.SUBDEBUG)
pool = multiprocessing.Pool(processes=4)
for (i) in range(0,15):
pool.apply_async(g1, [i])
pool.close()
pool.join()
def main():
f1()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
PS:尝试添加multiprocessing.freeze_support()
到主要无济于事.
我创建了一个池类,当被询问时,查找或初始化一个项目并将其返回给调用者.我使用a ConcurrentQueue<T>
作为底层集合,但我不确定它是否是正确的类型.
我不需要任何特定顺序的项目,我需要的是一个线程安全的集合,我可以推送和弹出..NET是否有用于此目的的快速集合类?
编辑:我ConcurrentBag<T>
在花钱回答后使用:
public sealed class Pool<T>
{
private readonly Func<T> initializer;
private readonly ConcurrentBag<T> bag;
public Pool(Func<T> initializer)
{
if (initializer == null)
throw new ArgumentNullException("initializer");
this.initializer = initializer;
this.bag = new ConcurrentBag<T>();
}
public Pool(Func<T> initializer, IEnumerable<T> collection)
{
if (initializer == null)
throw new ArgumentNullException("initializer");
if (collection == null)
throw new ArgumentNullException("initializer");
this.initializer = initializer;
this.bag = new ConcurrentBag<T>(collection);
}
public Pool(Func<T> initializer, int allocationCount)
: this(initializer)
{
if (allocationCount < 0) …
Run Code Online (Sandbox Code Playgroud) 我有一个应用程序,它使用Spring MessageListenerContainers和ActiveMQ.我已将其配置为使用PooledConnectionFactory,遵循activemq的文档.
我的方案如下:
一切似乎都运行正常,但偶尔会将消息重新传递给消费者,这会导致应用程序错误.
在日志之后我怀疑这个错误是由Spring的PooledConnectionFactory bean定义中的一些错误配置引起的,该定义关闭甚至使用的连接然后强制在当前会话上回滚并因此重新传递消息
池的spring配置如下:
<bean id="AMQconnFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${activemq_url}?initialReconnectDelay=100&timeout=3000&jms.prefetchPolicy.all=1&jms.redeliveryPolicy.initialRedeliveryDelay=300000
</value>
</property>
</bean>
<bean id="sharedConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" abstract="true">
<property name="connectionFactory" ref="AMQconnFactory" />
<property name="maxConnections" value="30" />
</bean>
<bean id="MotionJMSConnectionFactory" parent="sharedConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop" />
<bean id="sharedListenersProps" abstract="true" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="connectionFactory" ref="MotionJMSConnectionFactory" />
<property name="sessionTransacted" value="true" />
<property name="transactionTimeout" value="300"/>
</bean>
Run Code Online (Sandbox Code Playgroud)
指向我的方向的日志如下:
[TRACE] (ActiveMQMessageConsumer.java:494) (09:48:16,412) - ID:bbiovx01.bglobal.bcorp-56155-1365791814916-1:3222:1:1 received message: MessageDispatch {commandId = 0, responseRequired …
Run Code Online (Sandbox Code Playgroud) 是否CDI
允许以某种方式汇集?因为我认为这是EJB
bean的一个特性,但是Adam Bien在这个截屏视频中说容器选择是通过反射创建新的类实例还是使用现有的实例.所以,如果我有这两个bean
@RequestScoped
public class RequestBean {
public void doIt() {
}
}
@SessionScoped
public class SessionBean {
@Inject
private RequestBean bean;
public void doSomething() {
bean.doIt();
}
}
Run Code Online (Sandbox Code Playgroud)
问题是 - 是否总是RequestBean
在调用时创建新实例doSomething
或者CDI
容器以某种方式管理池中的实例?
我有一个令人尴尬的可并行化的问题,包括一堆彼此独立解决的任务.解决每个任务是相当漫长的,因此这是多处理的主要候选者.
问题是解决我的任务需要创建一个非常耗时的特定对象,但可以重用于所有任务(想想需要启动的外部二进制程序),所以在串行版本中我做了一些事情像这样:
def costly_function(task, my_object):
solution = solve_task_using_my_object
return solution
def solve_problem():
my_object = create_costly_object()
tasks = get_list_of_tasks()
all_solutions = [costly_function(task, my_object) for task in tasks]
return all_solutions
Run Code Online (Sandbox Code Playgroud)
当我尝试使用多处理并行化该程序时,my_object
由于多种原因(它不能被腌制,并且它不应该同时运行多个任务)而不能作为参数传递,所以我不得不求助于创建一个每个任务的单独对象实例:
def costly_function(task):
my_object = create_costly_object()
solution = solve_task_using_my_object
return solution
def psolve_problem():
pool = multiprocessing.Pool()
tasks = get_list_of_tasks()
all_solutions = pool.map_async(costly_function, tasks)
return all_solutions.get()
Run Code Online (Sandbox Code Playgroud)
但是创建多个实例的额外成本my_object
使得此代码仅略微快于序列化代码.
如果我可以my_object
在每个进程中创建一个单独的实例,然后将其重用于在该进程中运行的所有任务,那么我的时间将显着提高.有关如何做到这一点的任何指示?
例子:
from multiprocessing.dummy import Pool as ThreadPool
def testfunc(string):
print string
def main():
strings = ['one', 'two', 'three', ...]
pool = ThreadPool(10)
results = pool.map(testfunc, strings)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
这不会给我们提供清晰的结果,一行中只有一个结果:
one
two
three
Run Code Online (Sandbox Code Playgroud)
但是网格,有随机的换行符,比如
one
two
three
four
five
...
Run Code Online (Sandbox Code Playgroud)
为什么会发生这种情况?我可以在每次函数调用时使用一个换行符输出数据吗?
PS 有时我什至没有换行符甚至空格!PPS在windows下工作
我有一个简单的代码,如下所示。第一个进程保留队列,因此没有一个进程完成。
我希望能够在 AsyncResult 超过 .get() 超时时杀死它,以便我的池队列可以继续前进。但是,如果不修改“myfunc”,我找不到任何简单的方法来做到这一点。有谁知道如何实现这一点?
import multiprocessing
import time
def myf(x):
if x == 0:
time.sleep(100)
else:
time.sleep(2)
return 'done'
pool = multiprocessing.Pool(processes=1)
results = []
for x in range(8):
results.append(pool.apply_async(myf,args=[x]))
pool.close()
for res in results:
try:
print res.get(3)
except Exception as e:
print 'time out'
Run Code Online (Sandbox Code Playgroud) 我尝试在 python 中使用多处理来读取文件。这是一个小例子:
import multiprocessing
from time import *
class class1():
def function(self, datasheetname):
#here i start reading my datasheet
if __name__ == '__main__':
#Test with multiprosessing
pool = multiprocessing.Pool(processes=4)
pool.map(class1("Datasheetname"))
pool.close()
Run Code Online (Sandbox Code Playgroud)
现在我收到以下错误:
类型错误:map() 缺少 1 个必需的位置参数:“可迭代”
在该板的另一个线程中,我得到了使用 ThreadPool 执行此操作的提示,但我不知道该怎么做。有任何想法吗?
我想知道我的请求是否被网站停止,我需要设置一个代理。我首先尝试关闭 http 的连接,但我失败了。我也尝试测试我的代码,但现在似乎没有输出。Mybe 我使用了代理一切都会好吗?这是代码。
import requests
from urllib.parse import urlencode
import json
from bs4 import BeautifulSoup
import re
from html.parser import HTMLParser
from multiprocessing import Pool
from requests.exceptions import RequestException
import time
def get_page_index(offset, keyword):
#headers = {'User-Agent':'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50'}
data = {
'offset': offset,
'format': 'json',
'keyword': keyword,
'autoload': 'true',
'count': 20,
'cur_tab': 1
}
url = 'http://www.toutiao.com/search_content/?' + urlencode(data)
try:
response = requests.get(url, headers={'Connection': 'close'})
response.encoding …
Run Code Online (Sandbox Code Playgroud) 我有一个简单的代码:
path = [filepath1, filepath2, filepath3]
def umap_embedding(filepath):
file = np.genfromtxt(filepath,delimiter=' ')
if len(file) > 20000:
file = file[np.random.choice(file.shape[0], 20000, replace=False), :]
neighbors = len(file)//200
if neighbors >= 2:
neighbors = neighbors
else:
neighbors = 2
embedder = umap.UMAP(n_neighbors=neighbors,
min_dist=0.1,
metric='correlation', n_components=2)
embedder.fit(file)
embedded = embedder.transform(file)
name = 'file'
np.savetxt(name,embedded,delimiter=",")
if __name__ == '__main__':
p = Pool(processes = 20)
start = time.time()
for filepath in path:
p.apply_async(umap_embedding, [filepath])
p.close()
p.join()
print("Complete")
end = time.time()
print('total time (s)= ' + str(end-start)) …
Run Code Online (Sandbox Code Playgroud)