使用Tweepy收听流并搜索推文.如何停止以前的搜索并只收听新流?

stt*_*mas 18 python tweepy flask

我正在使用Flask和Tweepy来搜索实时推文.在前端,我有一个用户文本输入,以及一个名为"搜索"的按钮.理想情况下,当用户在输入中提供搜索词并单击"搜索"按钮时,Tweepy应该侦听新的搜索词并停止前一个搜索词流.单击"搜索"按钮时,它将执行此功能:

@app.route('/search', methods=['POST'])
# gets search-keyword and starts stream
def streamTweets():
    search_term = request.form['tweet']
    search_term_hashtag = '#' + search_term
    # instantiate listener
    listener = StdOutListener()
    # stream object uses listener we instantiated above to listen for data
    stream = tweepy.Stream(auth, listener)

    if stream is not None:
        print "Stream disconnected..."
        stream.disconnect()

    stream.filter(track=[search_term or search_term_hashtag], async=True)
    redirect('/stream') # execute '/stream' sse
    return render_template('index.html')
Run Code Online (Sandbox Code Playgroud)

/stream上面代码中第二行到最后一行执行的路由如下:

@app.route('/stream')
def stream():
    # we will use Pub/Sub process to send real-time tweets to client
    def event_stream():
        # instantiate pubsub
        pubsub = red.pubsub()
        # subscribe to tweet_stream channel
        pubsub.subscribe('tweet_stream')
        # initiate server-sent events on messages pushed to channel
        for message in pubsub.listen():
            yield 'data: %s\n\n' % message['data']
    return Response(stream_with_context(event_stream()), mimetype="text/event-stream")
Run Code Online (Sandbox Code Playgroud)

我的代码工作正常,因为它启动一个新流并在每次单击"搜索"按钮时搜索给定的术语,但它不会停止先前的搜索.例如,如果我的第一个搜索词是"NYC",然后我想搜索一个不同的词,比如说"洛杉矶",它会给我"纽约"和"洛杉矶"的结果,这不是我的意思想.我想要"洛杉矶"进行搜索.我该如何解决?换句话说,如何停止上一个流?我查看了其他以前的线程,我知道我必须使用stream.disconnect(),但我不知道如何在我的代码中实现它.任何帮助或输入将不胜感激.非常感谢!!

Mat*_*ttL 4

下面是一些在创建新流时取消旧流的代码。它的工作原理是将新流添加到全局列表中,然后stream.disconnect()在创建新流时调用列表中的所有流。

diff --git a/app.py b/app.py
index 1e3ed10..f416ddc 100755
--- a/app.py
+++ b/app.py
@@ -23,6 +23,8 @@ auth.set_access_token(access_token, access_token_secret)
 app = Flask(__name__)
 red = redis.StrictRedis()

+# Add a place to keep track of current streams
+streams = []

 @app.route('/')
 def index():
@@ -32,12 +34,18 @@ def index():
 @app.route('/search', methods=['POST'])
 # gets search-keyword and starts stream
 def streamTweets():
+        # cancel old streams
+        for stream in streams:
+            stream.disconnect()
+
        search_term = request.form['tweet']
        search_term_hashtag = '#' + search_term
        # instantiate listener
        listener = StdOutListener()
        # stream object uses listener we instantiated above to listen for data
        stream = tweepy.Stream(auth, listener)
+        # add this stream to the global list
+        streams.append(stream)
        stream.filter(track=[search_term or search_term_hashtag],
                async=True) # make sure stream is non-blocking
        redirect('/stream') # execute '/stream' sse
Run Code Online (Sandbox Code Playgroud)

这并没有解决会话管理的问题。根据您当前的设置,一名用户的搜索将影响所有用户的搜索。通过为您的用户提供一些标识符并将他们的流与标识符一起存储,可以避免这种情况。最简单的方法可能是使用 Flask 的会话支持。您也可以requestId按照皮埃尔的建议使用 a 来完成此操作。无论哪种情况,您还需要代码来通知用户何时关闭页面并关闭其流。