并行文件匹配,Python

use*_*896 4 python string parallel-processing multithreading gil

我正在尝试改进扫描文件中的恶意代码的脚本.我们在文件中有一个正则表达式模式列表,每行一个模式.这些正则表达式适用于grep,因为我们当前的实现基本上是一个bash脚本find\grep combo.bash脚本在我的基准测试目录上需要358秒.我能够编写一个在72秒内执行此操作的python脚本,但希望进一步提高.首先,我将发布基本代码然后调试我尝试过:

import os, sys, Queue, threading, re

fileList = []
rootDir = sys.argv[1]

class Recurser(threading.Thread):

    def __init__(self, queue, dir):
    self.queue = queue
    self.dir = dir
    threading.Thread.__init__(self)

    def run(self):
    self.addToQueue(self.dir)

    ## HELPER FUNCTION FOR INTERNAL USE ONLY
    def addToQueue(self,  rootDir):
      for root, subFolders, files in os.walk(rootDir):
    for file in files:
       self.queue.put(os.path.join(root,file))
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)
      self.queue.put(-1)

class Scanner(threading.Thread):

    def __init__(self, queue, patterns):
    self.queue = queue
    self.patterns = patterns
    threading.Thread.__init__(self)

    def run(self):
    nextFile = self.queue.get()
    while nextFile is not -1:
       #print "Trying " + nextFile
       self.scanFile(nextFile)
       nextFile = self.queue.get()


    #HELPER FUNCTION FOR INTERNAL UES ONLY
    def scanFile(self, file):
       fp = open(file)
       contents = fp.read()
       i=0
       #for patt in self.patterns:
       if self.patterns.search(contents):
      print "Match " + str(i) + " found in " + file

############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################
############MAIN MAIN MAIN MAIN##################


fileQueue = Queue.Queue()

#Get the shell scanner patterns
patterns = []
fPatt = open('/root/patterns')
giantRE = '('
for line in fPatt:
   #patterns.append(re.compile(line.rstrip(), re.IGNORECASE))
   giantRE = giantRE + line.rstrip() + '|'

giantRE = giantRE[:-1] + ')'
giantRE = re.compile(giantRE, re.IGNORECASE)

#start recursing the directories
recurser = Recurser(fileQueue,rootDir)
recurser.start()

print "starting scanner"
#start checking the files
for scanner in xrange(0,8):
   scanner = Scanner(fileQueue, giantRE)
   scanner.start()
Run Code Online (Sandbox Code Playgroud)

这显然是调试\丑陋的代码,不介意百万queue.put(-1),我会稍后清理它.有些缩进没有正确显示,特别是在scanFile中.

无论如何,我注意到了一些事情.使用1,4或甚至8个线程(对于xrange(0,???):)中的扫描程序没有什么区别.无论如何,我仍然得到~72秒.我认为这是由于python的GIL.

与制作巨型正则表达式相反,我尝试将每行(模式)作为一个compilex RE放在一个列表中,并在我的scanfile函数中迭代这个列表.这导致更长的执行时间.

为了避免python的GIL,我尝试将每个线程fork分配给grep,如下所示:

#HELPER FUNCTION FOR INTERNAL UES ONLY
def scanFile(self, file):
      s = subprocess.Popen(("grep", "-El", "--file=/root/patterns", file), stdout = subprocess.PIPE)
      output = s.communicate()[0]
      if output != '':
         print 'Matchfound in ' + file
Run Code Online (Sandbox Code Playgroud)

这导致更长的执行时间.

有关提高绩效的任何建议.

:::::::::::::编辑::::::::

我无法发布自己问题的答案,但以下是几个问题的答案:

@David Nehme - 只是为了让人们知道我知道我有一百万队列.输出(-1)的

@Blender - 标记队列的底部.我的扫描程序线程一直保持dequeing,直到它们达到底部的-1(而nextFile不是-1 :).处理器内核为8但是由于GIL使用1个线程,4个线程或8个线程没有区别.产生8个子过程导致代码明显变慢(142秒对72)

@ed - 是的,它和find\grep组合一样慢,实际上更慢,因为它不加选择地greps文件不需要

@Ron - 无法升级,这必须是通用的.你认为这会加速> 72秒吗?bash grepper做了358秒.我的python巨型RE方法做了72秒w\1-8个线程.po1方法w\8 thrads(8个子过程)在142秒运行.到目前为止,只有巨型RE python方法才是明显的赢家

@intuted

这是我们当前发现的\ grep combo(不是我的脚本)的内容.这很简单.还有一些额外的东西,比如ls,但没有什么可以导致5倍的减速.即使grep -r效率稍高,5x也是一个巨大的减速.

 find "${TARGET}" -type f -size "${SZLIMIT}" -exec grep -Eaq --file="${HOME}/patterns" "{}" \; -and -ls | tee -a "${HOME}/found.txt"
Run Code Online (Sandbox Code Playgroud)

python代码效率更高,我不知道为什么,但我通过实验测试了它.我更喜欢在python中这样做.我已经用python实现了5倍的加速,我想让它加速更多.

::::::::::::: WINNER WINNER WINNER ::::::::::::::::::

看起来我们有一个胜利者.

intued的shell脚本以34秒排在第二位,但@ steveha以24秒排在第一位.由于我们的许多盒子没有python2.6,我不得不cx_freeze它.我可以编写一个shell脚本包装来生成tar并解压缩它.然而,我喜欢简单的.

谢谢你所有的帮助,我现在有一个有效的系统管理工具

int*_*ted 5

关于你的Python脚本最终比find/grep组合更快,我有点困惑.如果你想以grep类似于Ron Smith在他的回答中建议的方式使用,你可以做类似的事情

find -type f | xargs -d \\n -P 8 -n 100 grep --file=/root/patterns
Run Code Online (Sandbox Code Playgroud)

启动grep将在退出之前处理100个文件的进程,在任何时候最多保持8个此类进程处于活动状态.让它们处理100个文件应该使每个文件的进程启动开销时间可以忽略不计.

注意:-d \\n选项xargs是GNU扩展,不适用于所有POSIX-ish系统.它指定文件名之间的*d*elimiter是换行符.虽然技术上文件名可以包含换行符,但实际上没有人这样做并保留他们的工作.为了与非GNU兼容,xargs您需要将-print0选项添加到find并使用-0而不是-d \\n使用xargs.这将安排空字节\0(十六进制0x00用作分隔符均由)findxargs.

您还可以采用先计算要grepped的文件数的方法

NUMFILES="$(find -type f | wc -l)";
Run Code Online (Sandbox Code Playgroud)

然后使用该数字在8个进程中进行均匀分割(假设bash为shell)

find -type f | xargs -d \\n -P 8 -n $(($NUMFILES / 8 + 1)) grep --file=/root/patterns
Run Code Online (Sandbox Code Playgroud)

我想,这可能会更好地工作,因为磁盘I/O的find将不与磁盘不同的I/O会干扰grep秒.我想这部分取决于文件的大小,以及它们是否连续存储 - 对于小文件,磁盘无论如何都会寻求很多,所以它并不重要.另请注意,特别是如果你有相当数量的RAM,后续运行这样的命令会更快,因为有些文件将保存在你的内存缓存中.

当然,您可以参数化,8以便更容易地尝试不同数量的并发进程.

如编辑.在评论中提到,这种方法的表现很可能仍然不如单一过程的表现grep -r.我想这取决于磁盘[数组]的相对速度,系统中处理器的数量等.


ste*_*eha 5

我认为threading,您应该将该multiprocessing模块用于Python解决方案,而不是使用该模块.Python线程可以与GIL发生冲突; 如果你只是有多个Python进程,那么GIL不是问题.

我认为,对于您正在做的工作流程池正是您想要的.默认情况下,池将默认为系统处理器中每个核心的一个进程.只需.map()使用要检查的文件名列表和执行检查的函数调用该方法.

http://docs.python.org/library/multiprocessing.html

如果这不比你的threading实现快,那么我不认为GIL是你的问题.

编辑:好的,我正在添加一个有效的Python程序.这使用工作进程池来打开每个文件并在每个文件中搜索模式.当工作人员找到匹配的文件名时,它只是将其打印出来(到标准输出),这样您就可以将此脚本的输出重定向到一个文件中,并获得文件列表.

编辑:我认为这是一个稍微容易阅读的版本,更容易理解.

我计时,搜索我的计算机上的/ usr/include中的文件.它在大约半秒内完成搜索.使用find管道传输xargs尽可能少地运行grep,大约需要0.05秒,大约加速10倍.但我讨厌你必须使用的巴洛克式奇怪的语言find才能正常工作,我喜欢Python版本.也许在非常大的目录上,差异会更小,因为Python的半秒的一部分必须是启动时间.对于大多数用途来说,也许半秒钟足够快!

import multiprocessing as mp
import os
import re
import sys

from stat import S_ISREG


# uncomment these if you really want a hard-coded $HOME/patterns file
#home = os.environ.get('HOME')
#patterns_file = os.path.join(home, 'patterns')

target = sys.argv[1]
size_limit = int(sys.argv[2])
assert size_limit >= 0
patterns_file = sys.argv[3]


# build s_pat as string like:  (?:foo|bar|baz)
# This will match any of the sub-patterns foo, bar, or baz
# but the '?:' means Python won't bother to build a "match group".
with open(patterns_file) as f:
    s_pat = r'(?:{})'.format('|'.join(line.strip() for line in f))

# pre-compile pattern for speed
pat = re.compile(s_pat)


def walk_files(topdir):
    """yield up full pathname for each file in tree under topdir"""
    for dirpath, dirnames, filenames in os.walk(topdir):
        for fname in filenames:
            pathname = os.path.join(dirpath, fname)
            yield pathname

def files_to_search(topdir):
    """yield up full pathname for only files we want to search"""
    for fname in walk_files(topdir):
        try:
            # if it is a regular file and big enough, we want to search it
            sr = os.stat(fname)
            if S_ISREG(sr.st_mode) and sr.st_size >= size_limit:
                yield fname
        except OSError:
            pass

def worker_search_fn(fname):
    with open(fname, 'rt') as f:
        # read one line at a time from file
        for line in f:
            if re.search(pat, line):
                # found a match! print filename to stdout
                print(fname)
                # stop reading file; just return
                return

mp.Pool().map(worker_search_fn, files_to_search(target))
Run Code Online (Sandbox Code Playgroud)