处理一个巨大的文件(9.1GB)并加快处理速度 - Python

cra*_*liv 11 python performance

我有一个9GB的推文文本文件,格式如下:

T      'time and date'
U      'name of user in the form of a URL'
W      Actual tweet
Run Code Online (Sandbox Code Playgroud)

总共有6,000,000个用户和超过60,000,000条推文.我使用itertools.izip()一次读取3行,然后根据名称将其写入文件.但它采取的方式太长(26小时和计数).怎么能更快?

发布完整性代码,

s='the existing folder which will have all the files'
with open('path to file') as f:
 for line1,line2,line3 in itertools.izip_longest(*[f]*3):
            if(line1!='\n' and line2!='\n' and line3!='\n'):
     line1=line1.split('\t')
     line2=line2.split('\t')
     line3=line3.split('\t')
     if(not(re.search(r'No Post Title',line1[1]))):
         url=urlparse(line3[1].strip('\n')).path.strip('/')

  if(url==''):
   file=open(s+'junk','a')
   file.write(line1[1])
   file.close()
  else:
   file=open(s+url,'a')
   file.write(line1[1])
   file.close()
Run Code Online (Sandbox Code Playgroud)

我的目标是在小文本上使用主题建模(例如,在一个用户的所有推文上运行lda,因此每个用户需要一个单独的文件),但它花费了太多时间.

更新:我使用了S.Lott用户的建议并使用了以下代码:

import re
from urlparse import urlparse
import os 
def getUser(result):
    result=result.split('\n')
    u,w=result[0],result[1]
    path=urlparse(u).path.strip('/')
    if(path==''):
        f=open('path to junk','a')
        f.write('its Junk !!')
        f.close()
    else:
        result="{0}\n{1}\n{2}\n".format(u,w,path)
        writeIntoFile(result)
def writeIntoFile(result):
    tweet=result.split('\n')
    users = {}
    directory='path to directory'
    u, w, user = tweet[0],tweet[1],tweet[2]
    if user not in users :
        if(os.path.isfile(some_directory+user)):
            if(len(users)>64):
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)
            users[user]=open(some_directory+user,'a')
            users[user].write(w+'\n')
            #users[user].flush
        elif (not(os.path.isfile(some_directory+user))):
            if len(users)>64:
                lru,aFile,u=min(users.values())
                aFile.close()
                users.pop(u)

            users[user]=open(some_directory+user,'w')
            users[user].write(w+'\n')
    for u in users:
        users[u].close()
import sys
s=open(sys.argv[1],'r')
tweet={}
for l in s:
    r_type,content=l.split('\t')
    if r_type in tweet:
    u,w=tweet.get('U',''),tweet.get('W','')
            if(not(re.search(r'No Post Title',u))):
                result="{0}{1}".format(u,w)
                getUser(result)
                tweet={}
        tweet[r_type]=content
Run Code Online (Sandbox Code Playgroud)

显然,它几乎是他所建议和善意分享的一面镜子.最初速度非常快,但随后速度变慢了.我已发布更新的代码,以便我可以获得更多关于如何更快地制作的建议.如果我是从sys.stdin读取的,那么我就无法解决导入错误.因此,为了节省时间并继续使用它,我只是使用它,希望它能正常工作.谢谢.

S.L*_*ott 22

这就是您的操作系统具有多处理管道的原因.

collapse.py sometweetfile | filter.py | user_id.py | user_split.py -d some_directory
Run Code Online (Sandbox Code Playgroud)

collapse.py

import sys
with open("source","r") as theFile:
    tweet = {}
    for line in theFile:
        rec_type, content = line.split('\t')
        if rec_type in tweet:
            t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
            result=  "{0}\t{1}\t{2}".format( t, u, w )
            sys.stdout.write( result )
            tweet= {}
        tweet[rec_type]= content
    t, u, w = tweet.get('T',''), tweet.get('U',''), tweet.get('W','')
    result=  "{0}\t{1}\t{2}".format( t, u, w )
    sys.stdout.write( result )
Run Code Online (Sandbox Code Playgroud)

filter.py

import sys
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    if 'No Post Title' in t:
        continue
    sys.stdout.write( tweet )
Run Code Online (Sandbox Code Playgroud)

user_id.py

import sys
import urllib
for tweet in sys.stdin:
    t, u, w = tweet.split('\t')
    path=urlparse(w).path.strip('/')
    result= "{0}\t{1}\t{2}\t{3}".format( t, u, w, path )
    sys.stdout.write( result )
Run Code Online (Sandbox Code Playgroud)

user_split.py

users = {}
for tweet in sys.stdin:
    t, u, w, user = tweet.split('\t')
    if user not in users:
        # May run afoul of open file limits...
        users[user]= open(some_directory+user,"w")
    users[user].write( tweet )
    users[user].flush( tweet )
for u in users:
    users[u].close()
Run Code Online (Sandbox Code Playgroud)

哇,你说.什么代码.

是.但.它在您拥有的所有处理核心之间展开,并且它们同时运行.此外,当您通过管道将stdout连接到stdin时,它实际上只是一个共享缓冲区:没有物理I/O发生.

以这种方式做事的速度惊人.这就是*Nix操作系统以这种方式工作的原因.这是你需要做的真正的速度.


LRU算法,FWIW.

    if user not in users:
        # Only keep a limited number of files open
        if len(users) > 64: # or whatever your OS limit is.
            lru, aFile, u = min( users.values() )
            aFile.close()
            users.pop(u)
        users[user]= [ tolu, open(some_directory+user,"w"), user ]
    tolu += 1
    users[user][1].write( tweet )
    users[user][1].flush() # may not be necessary
    users[user][0]= tolu
Run Code Online (Sandbox Code Playgroud)