我见过很多在一个类中使用Threads的Python脚本,其中很多都使用了threading.Event().例如:
class TimerClass(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.event = threading.Event()
def run(self):
while not self.event.is_set():
print "something"
self.event.wait(120)
Run Code Online (Sandbox Code Playgroud)
在while循环中,为什么他们检查条件是否没有设置self.event?
我有一个python多线程应用程序.我想在一个线程中运行一个asyncio循环,并从另一个线程发送回调和协同程序.应该很容易,但我无法理解asyncio的东西.
我提出了以下解决方案,它可以完成我想要的一半,随意评论任何事情:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t …Run Code Online (Sandbox Code Playgroud) 我是python中多线程的新手,并尝试使用线程模块学习多线程.我做了一个非常简单的多线程程序,我无法理解该threading.Thread.join方法.
这是我制作的程序的源代码
import threading
val = 0
def increment():
global val
print "Inside increment"
for x in range(100):
val += 1
print "val is now {} ".format(val)
thread1 = threading.Thread(target=increment, args=())
thread2 = threading.Thread(target=increment, args=())
thread1.start()
#thread1.join()
thread2.start()
#thread2.join()
Run Code Online (Sandbox Code Playgroud)
如果我使用它会有什么不同
thread1.join()
thread2.join()
Run Code Online (Sandbox Code Playgroud)
我在上面的代码中评论了哪些?我运行了两个源代码(一个带注释,另一个没有注释),但输出相同.
我希望能够做的是使用输入询问用户问题.例如:
print('some scenario')
prompt = input("You have 10 seconds to choose the correct answer...\n")
Run Code Online (Sandbox Code Playgroud)
然后如果时间流逝打印出类似的东西
print('Sorry, times up.')
Run Code Online (Sandbox Code Playgroud)
任何指导我正确方向的帮助将不胜感激.
我正在尝试keras使用多个线程(和tensorflow后端)训练具有不同参数值的多个模型.我已经看到了在多个线程中使用相同模型的一些示例,但在这种特殊情况下,我遇到了有关冲突图等的各种错误.这是我希望能够做的一个简单示例:
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import tensorflow as tf
from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential
sess = tf.Session()
def example_model(size):
model = Sequential()
model.add(Dense(size, input_shape=(5,)))
model.add(Dense(1))
model.compile(optimizer='sgd', loss='mse')
return model
if __name__ == '__main__':
K.set_session(sess)
X = np.random.random((10, 5))
y = np.random.random((10, 1))
models = [example_model(i) for i in range(5, 10)]
e = ThreadPoolExecutor(4)
res_list = [e.submit(model.fit, X, y) for model in models]
for …Run Code Online (Sandbox Code Playgroud) concurrency multithreading python-multithreading keras tensorflow
我正在编写一个应用程序,它将行添加到多个线程的同一文件中.
我有一个问题,其中一些行没有新行.
对此有何解决方案?
class PathThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def printfiles(self, p):
for path, dirs, files in os.walk(p):
for f in files:
print(f, file=output)
def run(self):
while True:
path = self.queue.get()
self.printfiles(path)
self.queue.task_done()
pathqueue = Queue.Queue()
paths = getThisFromSomeWhere()
output = codecs.open('file', 'a')
# spawn threads
for i in range(0, 5):
t = PathThread(pathqueue)
t.setDaemon(True)
t.start()
# add paths to queue
for path in paths:
pathqueue.put(path)
# wait for queue to get empty
pathqueue.join()
Run Code Online (Sandbox Code Playgroud) 如果我有一个整数i,i += 1在多个线程上做是不安全的:
>>> i = 0
>>> def increment_i():
... global i
... for j in range(1000): i += 1
...
>>> threads = [threading.Thread(target=increment_i) for j in range(10)]
>>> for thread in threads: thread.start()
...
>>> for thread in threads: thread.join()
...
>>> i
4858 # Not 10000
Run Code Online (Sandbox Code Playgroud)
但是,如果我有一个列表l,l += [1]在多个线程上做似乎是安全的:
>>> l = []
>>> def extend_l():
... global l
... for j in range(1000): l += [1]
... …Run Code Online (Sandbox Code Playgroud) 我即将编写一些计算密集型的Python代码,几乎可以肯定它的大部分时间都花在了numpy线性代数函数中.
手头的问题令人尴尬地平行.简而言之,对我来说最简单的方法就是使用多线程.主要障碍几乎肯定是全球翻译锁(GIL).
为了帮助设计这个,有一个心理模型是有用的,numpy可以期望操作在其持续时间内释放GIL.为此,我会欣赏任何经验法则,注意事项,注意事项等.
如果它很重要,我在Linux上使用64位Python 2.7.1,使用numpy1.5.1和scipy0.9.0rc2,使用英特尔MKL 10.3.1构建.
我喜欢将函数转换为线程而没有不必要的行来定义类的能力.我知道_thread,但看起来你不应该使用_thread.对于python 3,是否有一个类似于thread.start_new_thread的良好实践?
我正在尝试做这个Matasano加密挑战,涉及对人为减慢字符串比较功能的服务器进行定时攻击.它说使用"你选择的web框架",但我不想安装Web框架,所以我决定使用模块中内置的HTTPServer类http.server.
我想出了一些工作,但它是非常缓慢的,所以我尝试使用内置的(不良的记录)线程池来加速这一过程multiprocessing.dummy.它的速度要快得多,但我注意到一些奇怪的事情:如果我同时发出8个或更少的请求,它就可以了.如果我有更多,它会工作一段时间,并在看似随机的时间给我错误.这些错误似乎是不一致的,而不是总是相同的,但他们通常有Connection refused, invalid argument,OSError: [Errno 22] Invalid argument,urllib.error.URLError: <urlopen error [Errno 22] Invalid argument>,BrokenPipeError: [Errno 32] Broken pipe,或urllib.error.URLError: <urlopen error [Errno 61] Connection refused>在其中.
服务器可以处理的连接数有限制吗?我不认为线程本身的数量是问题,因为我编写了一个简单的函数,它在没有运行Web服务器的情况下进行了减慢的字符串比较,并使用500个并发线程调用它,并且它工作正常.我不认为只是从那么多线程发出请求就是问题所在,因为我已经使用了超过100个线程的爬虫(都同时向同一个网站发出请求)并且它们工作正常.看起来HTTPServer可能并不是为了可靠地托管能够获得大量流量的生产网站,但我很惊讶它很容易让它崩溃.
我尝试逐渐从我的代码中删除看起来与问题无关的东西,正如我在诊断这样的神秘错误时通常所做的那样,但在这种情况下这并不是很有帮助.看起来我正在删除看似无关的代码,服务器可以处理的连接数量逐渐增加,但没有明确的崩溃原因.
有谁知道如何增加我一次可以提出的请求数量,或者至少为什么会发生这种情况?
我的代码很复杂,但我想出了这个简单的程序来演示这个问题:
#!/usr/bin/env python3
import os
import random
from http.server import BaseHTTPRequestHandler, HTTPServer
from multiprocessing.dummy import Pool as ThreadPool
from socketserver import ForkingMixIn, ThreadingMixIn
from threading import Thread
from time import sleep
from urllib.error import HTTPError …Run Code Online (Sandbox Code Playgroud) python ×9
python-3.x ×3
concurrency ×1
gil ×1
httpserver ×1
input ×1
keras ×1
numpy ×1
python-2.7 ×1
tensorflow ×1
timer ×1
urllib ×1