ju.*_*ju. 19 python windows multithreading multiprocessing
我正在尝试编写一个扫描文件夹的python脚本并收集更新的SQL脚本,然后自动为SQL脚本提取数据.在代码中,while循环是扫描新的SQL文件,并发送到数据拉取功能.我无法理解如何使用while循环创建动态队列,但也有多进程来运行队列中的任务.
下面的代码有一个问题,即while循环迭代在移动到下一次迭代之前将在长作业上工作并收集其他作业以填充空闲处理器.
更新:
感谢@pbacterio捕获错误,现在错误消息消失了.更改代码后,python代码可以在一次迭代中获取所有作业脚本,并将脚本分发到四个处理器.但是,如果要进行下一次迭代,扫描并提交新添加的作业脚本,它将会很长时间.知道如何重建代码吗?
我终于找到了解决方案,请参阅下面的答案.事实证明我正在寻找的是
the_queue = Queue()
the_pool = Pool(4,worker_main,(the_queue,))
对于那些偶然发现类似想法的人来说,以下是这个自动化脚本的整个架构,它将共享驱动器转换为"用于SQL拉动的服务器"或任何其他作业队列"服务器".
一个.python脚本auto_data_pull.py如答案所示.您需要添加自己的工作职能.
湾 一个'批处理脚本',其中包括:
启动C:\ Anaconda2\python.exe C:\ Users\bin\auto_data_pull.py
C.添加由启动计算机触发的任务,运行"批处理脚本"即可.有用.
Python代码:
from glob import glob
import os, time
import sys
import CSV
import re
import subprocess
import pandas as PD
import pypyodbc
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = compute(func, args)
output.put(result)
#
# Function used to compute result
#
def compute(func, args):
result = func(args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)
def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")
files_dict = {}
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
return files_dict
##### working in single thread
def single_thread():
path = "Y:/"
before = check_files(path)
sql_queue = []
while True:
time.sleep(3)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
# print sql_queue
for sql_file in sql_queue:
try:
query_sql(sql_file)
except:
pass
##### not working in queue
def multiple_thread():
NUMBER_OF_PROCESSES = 4
path = "Y:/"
sql_queue = []
before = check_files(path) # get the current dictionary of sql_files
task_queue = Queue()
done_queue = Queue()
while True: #while loop to check the changes of the files
time.sleep(5)
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
TASKS = [(query_sql, sql_file) for sql_file in sql_queue]
# Create queues
#submit task
for task in TASKS:
task_queue.put(task)
for i in range(NUMBER_OF_PROCESSES):
p = Process(target=worker, args=(task_queue, done_queue)).start()
# try:
# p = Process(target=worker, args=(task_queue))
# p.start()
# except:
# pass
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
# single_thread()
if __name__ == '__main__':
# freeze_support()
multiple_thread()
Run Code Online (Sandbox Code Playgroud)
参考:
我已经弄清楚了。感谢您的回复激发了我的想法。现在,脚本可以运行 while 循环来监视文件夹中是否有新更新/添加的 SQL 脚本,然后将数据拉取分发到多个线程。解决方案来自queue.get()和queue.put()。我假设队列对象自己负责通信。
这是最终的代码——
from glob import glob
import os, time
import sys
import pypyodbc
from multiprocessing import Process, Queue, Event, Pool, current_process, freeze_support
def query_sql(sql_file): #test func
#jsl file processing and SQL querying, data table will be saved to csv.
fo_name = os.path.splitext(sql_file)[0] + '.csv'
fo = open(fo_name, 'w')
print sql_file
fo.write("sql_file {0} is done\n".format(sql_file))
return "Query is done for \n".format(sql_file)
def check_files(path):
"""
arguments -- root path to monitor
returns -- dictionary of {file: timestamp, ...}
"""
sql_query_dirs = glob(path + "/*/IDABox/")
files_dict = {}
try:
for sql_query_dir in sql_query_dirs:
for root, dirs, filenames in os.walk(sql_query_dir):
[files_dict.update({(root + filename): os.path.getmtime(root + filename)}) for
filename in filenames if filename.endswith('.jsl')]
except:
pass
return files_dict
def worker_main(queue):
print os.getpid(),"working"
while True:
item = queue.get(True)
query_sql(item)
def main():
the_queue = Queue()
the_pool = Pool(4, worker_main,(the_queue,))
path = "Y:/"
before = check_files(path) # get the current dictionary of sql_files
while True: #while loop to check the changes of the files
time.sleep(5)
sql_queue = []
after = check_files(path)
added = [f for f in after if not f in before]
deleted = [f for f in before if not f in after]
overlapped = list(set(list(after)) & set(list(before)))
updated = [f for f in overlapped if before[f] < after[f]]
before = after
sql_queue = added + updated
if sql_queue:
for jsl_file in sql_queue:
try:
the_queue.put(jsl_file)
except:
print "{0} failed with error {1}. \n".format(jsl_file, str(sys.exc_info()[0]))
pass
else:
pass
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)