mro*_*rok 2 python multithreading udp twisted
我正在编写一个应用程序来收集UDP消息并每1秒处理一次.
应用程序原型如下:
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time
class UdpListener(DatagramProtocol):
messages = []
def datagramReceived(self, data, (host, port)):
self.messages.append(data)
class Messenger(threading.Thread):
listener = None
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.messages
length = len(recivedMessages)
messagesToProccess = recivedMessages[0:length]
#doSomethingWithMessages(messagesToProccess)
del self.listener.messages[0:length]
print(length)
listener = UdpListener()
messenger = Messenger()
messenger.listener = listener
messenger.start()
reactor.listenUDP(5556, listener)
reactor.run()
Run Code Online (Sandbox Code Playgroud)
我不确定我是否可以轻松地从列表中删除起始值(del self.listener.messages [0:length]),而不会有任何传入消息更改列表和应用程序崩溃的风险.
更新 - 带锁的版本
class Messenger(threading.Thread):
listener = None
lock = threading.Lock()
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.messages
self.lock.acquire()
try:
length = len(recivedMessages)
messagesToProccess = recivedMessages[0:length]
del self.listener.messages[0:length]
except Exception as e:
raise e
finally:
self.lock.release()
#doSomethingWithMessages(messagesToProccess)
print(length)
Run Code Online (Sandbox Code Playgroud)
您的代码不是线程安全的,不是.你需要锁定messages.
但是,这里不需要线程.为什么不这样做?
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
class UdpListener(DatagramProtocol):
callingLater = False
messages = []
def process(self):
doSomethingWithMessages(self.messages)
self.messages = []
self.callingLater = False
def datagramReceived(self, data, (host, port)):
self.messages.append(data)
if not self.callingLater:
reactor.callLater(1.0, self.process)
self.callingLater = True
listener = UdpListener()
reactor.listenUDP(5556, listener)
reactor.run()
Run Code Online (Sandbox Code Playgroud)
更新:以下是原始版本如何使用锁定,仅用于教育目的.请注意,这不是那么有效,也更容易出错.编辑:将所有消息逻辑分离出来,UdpListener以便使用它的类不需要知道其粘滞的内部细节.
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
import threading
import time
class UdpListener(DatagramProtocol):
message_lock = threading.Lock()
messages = []
def datagramReceived(self, data, (host, port)):
with self.message_lock:
self.messages.append(data)
def getAndClearMessages(self):
with self.message_lock:
res = self.messages
self.messages = []
return res
class Messenger(threading.Thread):
listener = None
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(1)
recivedMessages = self.listener.getAndClearMessages()
length = len(recivedMessages)
#doSomethingWithMessages(recivedMessages)
print(length)
listener = UdpListener()
messenger = Messenger()
messenger.listener = listener
messenger.start()
reactor.listenUDP(5556, listener)
reactor.run()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1673 次 |
| 最近记录: |