10m*_*mjg 7 python multithreading exception-handling tweepy
我正在使用 tweepy 来处理大型 twitter 流(超过 4,000 个帐户)。我添加到流中的帐户越多,我就越有可能收到此错误:
Traceback (most recent call last):
File "myscript.py", line 2103, in <module>
main()
File "myscript.py", line 2091, in main
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 445, in filter
self._start(async)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 361, in _start
self._run()
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 294, in _run
raise exception
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read, 2000 more expected)', IncompleteRead(0 bytes read, 2000 more expected))
Run Code Online (Sandbox Code Playgroud)
显然,这是一种粘稠的流水-经验显然,这是太粗处理。基于在 stackoverflow 上研究此错误以及“我添加的帐户越多,此异常发生的速度越快”的经验趋势,我的假设是这是“我的错”。我处理每条推文的时间太长和/或我的消防水管太粗。我明白了。
但是尽管有这样的设置,我仍然有两个问题似乎无法找到可靠的答案。
1. 有没有办法简单地“处理”这个异常,接受我会错过一些推文,但保持脚本运行?我想可能它错过了一条推文(或许多推文,但如果我可以在没有我想要的 100% 的推文的情况下生活,那么脚本/流仍然可以继续,随时准备捕捉下一条推文。
我已经尝试过这种异常处理,这是在一个关于 stackoverflow 的类似问题中推荐的: from urllib3.exceptions import ProtocolError
while True:
try:
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
except ProtocolError:
continue
Run Code Online (Sandbox Code Playgroud)
但不幸的是,对我来说,(也许我错误地实施了它,但我认为我没有做到),这不起作用。我得到了与之前在使用或不使用推荐的异常处理代码的情况下得到的完全相同的错误。
我可以在一个线程上将推文写入 - 以原始 - 预处理 - 到内存,或数据库或其他东西吗?然后,是否有第二个线程准备好处理这些推文,一旦准备就绪?我认为,至少,它将我对推文的后处理排除在等式之外,作为我正在阅读的消防水带带宽的限制因素。然后,如果我仍然遇到错误,我可以减少我关注的人等。
我看过一些线程教程,但我认为可能值得询问它是否“适用于”......这个 tweepy/twitter/etc/complex。我对我对我遇到的问题或线程可能如何帮助的理解没有信心,所以我想我可以征求建议,看看这是否确实对我有帮助。
如果这个想法是有效的,是否有一种简单的示例代码可以帮助我指出正确的方向?
我想我通过最终完成我的第一个队列/线程实现来解决这个问题。我没有足够的知识来知道这样做的最佳方法,但我认为这种方法确实有效。使用下面的代码,我现在建立了一个新推文队列,并可以在队列中按照我的意愿处理它们,而不是落后并失去与 tweepy 的连接。
from Queue import Queue
from threading import Thread
class My_Parser(tweepy.StreamListener):
def __init__(self, q = Queue()):
num_worker_threads = 4
self.q = q
for i in range(num_worker_threads):
t = Thread(target=self.do_stuff)
t.daemon = True
t.start()
def on_data(self, data):
self.q.put(data)
def do_stuff(self):
while True:
do_whatever(self.q.get())
self.q.task_done()
Run Code Online (Sandbox Code Playgroud)
我确实继续挖掘了一段时间关于 IncompleteRead 错误,我尝试了更多使用 url libs 和 http libs 的异常处理解决方案,但我对此很挣扎。而且我认为除了保持连接之外,排队的东西可能还有一些好处(一方面,不会丢失数据)。
希望这对某人有帮助。哈哈。