Python 每行读取巨大的文件行并将其发送到多处理或线程

Eva*_*exe 6 python multiprocessing readfile

我已经尝试让我的代码工作很多天了,我很绝望。我已经在互联网上搜索过,但仍然找不到。

我有一个以“latin-1”编码的文本文件,大小为 9GB -> 737 022 387 行,每行包含一个字符串。

我想读取每一行并将它们发送到等待响应的http PUT请求中,如果响应是200或400,则返回TRUE或FALSE PUT请求大约需要1到3秒,因此要加快处理时间我想使用线程或多处理。

首先,我模拟 PUT 请求并休眠 3 秒。即使我无法让它发挥作用

这段代码将我的字符串分割成字符,我不知道为什么......

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,line )
        print(res)
Run Code Online (Sandbox Code Playgroud)

这给出了错误: TypeError: process_line() 需要 1 个位置参数,但给出了 17 个

import multiprocessing
from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    with open(r"d:\txtFile",encoding="latin-1") as file:
        res = pool.apply(process_line,file.readline() )
        print(res)
Run Code Online (Sandbox Code Playgroud)

那:使计算机崩溃

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,peon )
        print(res)
Run Code Online (Sandbox Code Playgroud)

Nit*_*wal 4

尽管这个问题看起来不切实际。正在拍摄 737,022,387 个请求!计算一下单台电脑需要多少个月!!

不过,完成此任务的更好方法是在单独的线程中从文件中逐行读取并插入到队列中。然后对队列进行多处理。

解决方案一:

from multiprocessing import Queue, Process
from threading import Thread
from time import sleep

urls_queue = Queue()
max_process = 4

def read_urls():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            urls_queue.put(url.strip())
            print('put url: {}'.format(url.strip()))

    # put DONE to tell send_request_processor to exit
    for i in range(max_process):
        urls_queue.put("DONE")


def send_request(url):
    print('send request: {}'.format(url))
    sleep(1)
    print('recv response: {}'.format(url))


def send_request_processor():
    print('start send request processor')
    while True:
        url = urls_queue.get()
        if url == "DONE":
            break
        else:
            send_request(url)


def main():
    file_reader_thread = Thread(target=read_urls)
    file_reader_thread.start()

    procs = []
    for i in range(max_process):
        p = Process(target=send_request_processor)
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

    print('all done')
    # wait for all tasks in the queue
    file_reader_thread.join()


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

演示: https: //onlinegdb.com/Elfo5bGFz

解决方案2:

您可以使用tornado异步网络库

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            await q.put(url)
            print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')
    # producer and consumer can run in parallel

IOLoop.current().run_sync(main)
Run Code Online (Sandbox Code Playgroud)