函数本地名称绑定外部作用域

Luc*_*oca 13 python scope decorator

我需要一种方法从外部代码块"注入"函数到函数中,因此它们可以在本地访问,并且它们不需要由函数的代码(定义为函数参数,从*args等等加载)专门处理.

简化的场景:提供一框架,使用户能够定义(用尽可能少的语法越好)自定义函数来操纵框架的其他对象(其是必需global).

理想情况下,用户定义

def user_func():
    Mouse.eat(Cheese)
    if Cat.find(Mouse):
        Cat.happy += 1
Run Code Online (Sandbox Code Playgroud)

在这里Cat,Mouse并且Cheese是框架对象,出于好的理由,它们不能限制在全局命名空间中.

我想为这个函数编写一个包装器,表现如下:

def framework_wrap(user_func):
    # this is a framework internal and has name bindings to Cat, Mouse and Cheese
    def f():
        inject(user_func, {'Cat': Cat, 'Mouse': Mouse, 'Cheese': Cheese})
        user_func()
    return f
Run Code Online (Sandbox Code Playgroud)

然后,这个包装器可以应用于所有用户定义的函数(作为装饰器,由用户自己或自动,虽然我打算使用元类).

@framework_wrap
def user_func():
Run Code Online (Sandbox Code Playgroud)

我知道Python 3的nonlocal关键字,但我仍然认为丑陋(从框架的用户角度来看)添加额外的一行:

nonlocal Cat, Mouse, Cheese
Run Code Online (Sandbox Code Playgroud)

并担心将他需要的每个对象添加到此行.

任何建议都非常感谢.

aar*_*ing 11

堆栈越乱,我越希望没有.不要破解全局变量来做你想做的事.而是破解字节码.我有两种方法可以做到这一点.

1)添加包含所需引用的单元格f.func_closure.您必须重新组合要使用的函数的字节码,LOAD_DEREF而不是LOAD_GLOBAL为每个值生成一个单元格.然后,您将单元格和新代码对象的元组传递给types.FunctionType并获取具有相应绑定的函数.函数的不同副本可以具有不同的本地绑定,因此它应该像您想要的那样是线程安全的.

2)在函数参数列表的末尾为新的本地语句添加参数.更换适当的发生LOAD_GLOBALLOAD_FAST.然后通过使用types.FunctionType并传入新的代码对象和您想要的绑定元组作为默认选项来构造一个新函数.这在某种意义上是有限的,因为python将函数参数限制为255,并且它不能用于使用变量参数的函数.然而,它让我感到震惊,因为两者中更具挑战性的是我实施的那个(还有其他可以用这个完成的东西).同样,您可以使用不同的绑定制作不同的函数副本,也可以使用每个调用位置所需的绑定调用该函数.所以它也可以像你想要的那样是线程安全的.

import types
import opcode

# Opcode constants used for comparison and replacecment
LOAD_FAST = opcode.opmap['LOAD_FAST']
LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL']
STORE_FAST = opcode.opmap['STORE_FAST']

DEBUGGING = True

def append_arguments(code_obj, new_locals):
    co_varnames = code_obj.co_varnames   # Old locals
    co_names = code_obj.co_names      # Old globals
    co_argcount = code_obj.co_argcount     # Argument count
    co_code = code_obj.co_code         # The actual bytecode as a string

    # Make one pass over the bytecode to identify names that should be
    # left in code_obj.co_names.
    not_removed = set(opcode.hasname) - set([LOAD_GLOBAL])
    saved_names = set()
    for inst in instructions(co_code):
        if inst[0] in not_removed:
            saved_names.add(co_names[inst[1]])

    # Build co_names for the new code object. This should consist of 
    # globals that were only accessed via LOAD_GLOBAL
    names = tuple(name for name in co_names
                  if name not in set(new_locals) - saved_names)

    # Build a dictionary that maps the indices of the entries in co_names
    # to their entry in the new co_names
    name_translations = dict((co_names.index(name), i)
                             for i, name in enumerate(names))

    # Build co_varnames for the new code object. This should consist of
    # the entirety of co_varnames with new_locals spliced in after the
    # arguments
    new_locals_len = len(new_locals)
    varnames = (co_varnames[:co_argcount] + new_locals +
                co_varnames[co_argcount:])

    # Build the dictionary that maps indices of entries in the old co_varnames
    # to their indices in the new co_varnames
    range1, range2 = xrange(co_argcount), xrange(co_argcount, len(co_varnames))
    varname_translations = dict((i, i) for i in range1)
    varname_translations.update((i, i + new_locals_len) for i in range2)

    # Build the dictionary that maps indices of deleted entries of co_names
    # to their indices in the new co_varnames
    names_to_varnames = dict((co_names.index(name), varnames.index(name))
                             for name in new_locals)

    if DEBUGGING:
        print "injecting: {0}".format(new_locals)
        print "names: {0} -> {1}".format(co_names, names)
        print "varnames: {0} -> {1}".format(co_varnames, varnames)
        print "names_to_varnames: {0}".format(names_to_varnames)
        print "varname_translations: {0}".format(varname_translations)
        print "name_translations: {0}".format(name_translations)


    # Now we modify the actual bytecode
    modified = []
    for inst in instructions(code_obj.co_code):
        # If the instruction is a LOAD_GLOBAL, we have to check to see if
        # it's one of the globals that we are replacing. Either way,
        # update its arg using the appropriate dict.
        if inst[0] == LOAD_GLOBAL:
            print "LOAD_GLOBAL: {0}".format(inst[1])
            if inst[1] in names_to_varnames:
                print "replacing with {0}: ".format(names_to_varnames[inst[1]])
                inst[0] = LOAD_FAST
                inst[1] = names_to_varnames[inst[1]]
            elif inst[1] in name_translations:    
                inst[1] = name_translations[inst[1]]
            else:
                raise ValueError("a name was lost in translation")
        # If it accesses co_varnames or co_names then update its argument.
        elif inst[0] in opcode.haslocal:
            inst[1] = varname_translations[inst[1]]
        elif inst[0] in opcode.hasname:
            inst[1] = name_translations[inst[1]]
        modified.extend(write_instruction(inst))

    code = ''.join(modified)
    # Done modifying codestring - make the code object

    return types.CodeType(co_argcount + new_locals_len,
                          code_obj.co_nlocals + new_locals_len,
                          code_obj.co_stacksize,
                          code_obj.co_flags,
                          code,
                          code_obj.co_consts,
                          names,
                          varnames,
                          code_obj.co_filename,
                          code_obj.co_name,
                          code_obj.co_firstlineno,
                          code_obj.co_lnotab)


def instructions(code):
    code = map(ord, code)
    i, L = 0, len(code)
    extended_arg = 0
    while i < L:
        op = code[i]
        i+= 1
        if op < opcode.HAVE_ARGUMENT:
            yield [op, None]
            continue
        oparg = code[i] + (code[i+1] << 8) + extended_arg
        extended_arg = 0
        i += 2
        if op == opcode.EXTENDED_ARG:
            extended_arg = oparg << 16
            continue
        yield [op, oparg]

def write_instruction(inst):
    op, oparg = inst
    if oparg is None:
        return [chr(op)]
    elif oparg <= 65536L:
        return [chr(op), chr(oparg & 255), chr((oparg >> 8) & 255)]
    elif oparg <= 4294967296L:
        return [chr(opcode.EXTENDED_ARG),
                chr((oparg >> 16) & 255),
                chr((oparg >> 24) & 255),
                chr(op),
                chr(oparg & 255),
                chr((oparg >> 8) & 255)]
    else:
        raise ValueError("Invalid oparg: {0} is too large".format(oparg))



if __name__=='__main__':
    import dis

    class Foo(object):
        y = 1

    z = 1
    def test(x):
        foo = Foo()
        foo.y = 1
        foo = x + y + z + foo.y
        print foo

    code_obj = append_arguments(test.func_code, ('y',))
    f = types.FunctionType(code_obj, test.func_globals, argdefs=(1,))
    if DEBUGGING:
        dis.dis(test)
        print '-'*20
        dis.dis(f)
    f(1)
Run Code Online (Sandbox Code Playgroud)

请注意,此代码的整个分支(与之相关EXTENDED_ARG)未经测试,但对于常见情况,它似乎非常可靠.我将攻击它,目前正在编写一些代码来验证输出.然后(当我绕过它时)我将对整个标准库运行它并修复任何错误.

我也可能正在实施第一个选项.