优化:将JSON从Streaming API转储到Mongo

Sag*_*kar 8 python json mongodb pymongo http-streaming

背景: 我有一个python模块设置为从流API中获取JSON对象,并使用pymongo将它们(一次25个批量插入)存储在MongoDB中.为了便于比较,我也有一个bash命令curl从相同的流API和pipemongoimport.这两种方法都将数据存储在单独的集合中

我会定期监控count()收藏品,以检查收藏情况.

到目前为止,我看到该python模块滞后于该curl | mongoimport方法背后的大约1000个JSON对象.

问题: 如何优化python模块以便与 curl | mongoimport?同步?

我无法使用,tweetstream因为我没有使用Twitter API,而是使用第三方流媒体服务.

有人可以帮帮我吗?

Python 模块:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()
Run Code Online (Sandbox Code Playgroud)

谢谢阅读.

Sag*_*kar 1

摆脱了 StringIO 库。在本例中,由于每行都会调用WRITEFUNCTION回调handle_data,因此只需直接加载即可JSON。然而,有时数据中可能JSON包含两个对象。抱歉,我无法发布curl我使用的命令,因为它包含我们的凭据。但是,正如我所说,这是适用于任何流 API 的普遍问题。


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    
Run Code Online (Sandbox Code Playgroud)