暂停两个Python线程,而第三个线程做东西(带锁?)

ke.*_*ke. 4 python multithreading locks

我是并发编程的新手.

我想反复执行三项任务.前两个应该一直运行,第三个应该每小时运行一次.前两个任务可以并行运行,但我总是希望在第三个任务运行时暂停它们.

这是我尝试过的骨架:

import threading
import time

flock = threading.Lock()
glock = threading.Lock()

def f():
    while True:
        with flock:
            print 'f'
            time.sleep(1)

def g():
    while True:
        with glock:
            print 'g'
            time.sleep(1)

def h():
    while True:
        with flock:
            with glock:
                print 'h'
        time.sleep(5)

threading.Thread(target=f).start()
threading.Thread(target=g).start()
threading.Thread(target=h).start()
Run Code Online (Sandbox Code Playgroud)

我希望这段代码每秒打印一次f和ag,大约每五秒打一次.然而,当我运行它时,在我开始看到一些h之前需要大约12 f和12 g.它看起来像前两个线程不断释放并重新获取它们的锁,而第三个线程被排除在循环之外.

  1. 这是为什么?当第三个线程试图获取当前持有的锁,然后它被释放时,不应该立即获取而不是第一个/第二个线程立即再次获取它?我可能误解了一些事情.
  2. 什么是实现我想要的好方法?

注意:将这些time.sleep(1)调用移出with flock/glock块适用于这个简单的例子,但显然不适用于线程花费大部分时间进行实际操作的真实应用程序.当前两个线程在每次执行循环体后休眠一秒钟,并且释放锁定后,第三个任务仍然永远不会被执行.

unu*_*tbu 5

如何使用threading.Events:

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(resume,is_waiting,name):
    while True:
        if not resume.is_set():
            is_waiting.set()
            logger.debug('{n} pausing...'.format(n=name))
            resume.wait()
            is_waiting.clear()
        logger.info(name)
        time.sleep(1)

def h(resume,waiters):
    while True:
        logger.debug('halt') 
        resume.clear()
        for i,w in enumerate(waiters):
            logger.debug('{i}: wait for worker to pause'.format(i=i))
            w.wait()
        logger.info('h begin')
        time.sleep(2)
        logger.info('h end')        
        logger.debug('resume')
        resume.set()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

# set means resume; clear means halt
resume = threading.Event()
resume.set()

waiters=[]
for name in 'fg':
    is_waiting=threading.Event()
    waiters.append(is_waiting)
    threading.Thread(target=f,args=(resume,is_waiting,name)).start()    
threading.Thread(target=h,args=(resume,waiters)).start()
Run Code Online (Sandbox Code Playgroud)

产量

[07:28:55 Thread-1] f
[07:28:55 Thread-2] g
[07:28:55 Thread-3] halt
[07:28:55 Thread-3] 0: wait for worker to pause
[07:28:56 Thread-1] f pausing...
[07:28:56 Thread-2] g pausing...
[07:28:56 Thread-3] 1: wait for worker to pause
[07:28:56 Thread-3] h begin
[07:28:58 Thread-3] h end
[07:28:58 Thread-3] resume
[07:28:58 Thread-1] f
[07:28:58 Thread-2] g
[07:28:59 Thread-1] f
[07:28:59 Thread-2] g
[07:29:00 Thread-1] f
[07:29:00 Thread-2] g
[07:29:01 Thread-1] f
[07:29:01 Thread-2] g
[07:29:02 Thread-1] f
[07:29:02 Thread-2] g
[07:29:03 Thread-3] halt
Run Code Online (Sandbox Code Playgroud)

(响应注释中的问题)此代码尝试测量h-thread从其他工作线程获取每个锁所需的时间.

它似乎表明,即使h等待获取锁定,其他工作线程也可能以相当高的概率释放并重新获取锁定.没有优先考虑h因为它等待的时间更长.

David Beazley在PyCon上介绍了与线程和GIL相关的问题.这是幻灯片pdf.这是一个引人入胜的读物,也可能有助于解释这一点.

import threading
import time
import logging

logger=logging.getLogger(__name__)

def f(lock,n):
    while True:
        with lock:
            logger.info(n)
            time.sleep(1)

def h(locks):
    while True:
        t=time.time()
        for n,lock in enumerate(locks):
            lock.acquire()
            t2=time.time()
            logger.info('h acquired {n}: {d}'.format(n=n,d=t2-t))
            t=t2
        t2=time.time()
        logger.info('h {d}'.format(d=t2-t))
        t=t2
        for lock in locks:
            lock.release()
        time.sleep(5)

logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

locks=[]
N=5
for n in range(N):
    lock=threading.Lock()
    locks.append(lock)
    t=threading.Thread(target=f,args=(lock,n))
    t.start()

threading.Thread(target=h,args=(locks,)).start()
Run Code Online (Sandbox Code Playgroud)