处理其消费者中的生成器异常

geo*_*org 28 python exception-handling generator

这是处理生成器中抛出异常的后续操作,并讨论了一个更普遍的问题.

我有一个以不同格式读取数据的函数.所有格式都是面向行或面向记录的,每种格式都有一个专用的解析功能,作为生成器实现.因此主读取函数获取输入和生成器,它从输入读取其各自的格式并将记录传递回主函数:

def read(stream, parsefunc):
    for record in parsefunc(stream):
        do_stuff(record)
Run Code Online (Sandbox Code Playgroud)

在哪里parsefunc是这样的:

def parsefunc(stream):
    while not eof(stream):
        rec = read_record(stream)
        do some stuff
        yield rec
Run Code Online (Sandbox Code Playgroud)

我面临的问题是,虽然parsefunc可以抛出异常(例如,当从流中读取时),但它不知道如何处理它.负责处理异常的read函数是主要功能.请注意,异常发生在每个记录的基础上,因此即使一个记录失败,生成器也应继续其工作并返回记录,直到整个流耗尽为止.

在上一个问题中,我试图放入next(parsefunc)一个try块,但事实证明,这不会起作用.所以我必须添加try-exceptparsefunc自身,然后以某种方式向消费者提供例外:

def parsefunc(stream):
    while not eof(stream):
        try:
            rec = read_record()
            yield rec
        except Exception as e:
            ?????
Run Code Online (Sandbox Code Playgroud)

我不愿意这样做因为

  • try在一个不打算处理任何异常的函数中使用是没有意义的
  • 我不清楚如何将异常传递给消费函数
  • 会有很多格式和许多格式parsefunc,我不想用太多的帮助代码来混淆它们.

有没有人建议更好的架构?

针对googlers的说明:除了最佳答案外,还要关注发送Jon的帖子 - 非常聪明且富有洞察力的内容.

Sal*_*lil 16

您可以在parsefunc中返回记录和异常的元组,并让使用者函数决定如何处理异常:

import random

def get_record(line):
  num = random.randint(0, 3)
  if num == 3:
    raise Exception("3 means danger")
  return line


def parsefunc(stream):
  for line in stream:
    try:
      rec = get_record(line)
    except Exception as e:
      yield (None, e)
    else:
      yield (rec, None)

if __name__ == '__main__':
  with open('temp.txt') as f:
    for rec, e in parsefunc(f):
      if e:
        print "Got an exception %s" % e
      else:
        print "Got a record %s" % rec
Run Code Online (Sandbox Code Playgroud)


Jon*_*ark 13

深入思考在更复杂的情况下会发生什么,这证明了Python避免将异常冒出生成器的选择.

如果我从流对象得到I/O错误,那么简单地能够恢复并继续读取的可能性就会很低,而不会以某种方式重置发生器的本地结构.我会以某种方式与阅读过程协调一致,以便继续:跳过垃圾,推回部分数据,重置一些不完整的内部跟踪结构等.

只有生成器有足够的上下文才能正确完成.即使您可以保留生成器上下文,使用外部块处理异常也会完全违反Demeter法则.周围块需要重置和继续运行的所有重要信息都在生成器函数的局部变量中!获取或传递这些信息虽然可能,但令人作呕.

清理几乎总是抛出结果异常,在这种情况下,读取器 - 生成器已经有一个内部异常块.努力在脑死亡的简单情况下保持这种清洁,只是为了让它在几乎每一个现实的背景下都被打破,这将是愚蠢的.因此,只要try发电机,你会需要的体except块,无论如何,在任何复杂的情况下.

如果异常条件看起来像异常,那将是很好的,而不是像返回值.所以我会添加一个中间适配器以允许这样做:生成器将产生数据或异常,并且适配器将重新引发异常(如果适用).应该在for循环中首先调用适配器,这样我们就可以选择在循环中捕获它并清理以继续,或者打开循环以捕获它并放弃该过程.我们应该在设置周围放置一些蹩脚的包装来指示技巧正在进行中,并且如果函数正在调整则强制调用适配器.

这样每个层都会出现错误,它具有上下文要处理的功能,但代价是适配器有点干扰(也许也很容易忘记).

所以我们会:

def read(stream, parsefunc):
  try:
    for source in frozen(parsefunc(stream)):
      try:
        record = source.thaw()
        do_stuff(record)
      except Exception, e:
        log_error(e)
        if not is_recoverable(e):
          raise
        recover()
  except Exception, e:
    properly_give_up()
  wrap_up()
Run Code Online (Sandbox Code Playgroud)

(这两个try块是可选的.)

适配器看起来像:

class Frozen(object):
  def __init__(self, item):
    self.value = item
  def thaw(self):
    if isinstance(value, Exception):
      raise value
    return value

def frozen(generator):
    for item in generator:
       yield Frozen(item)
Run Code Online (Sandbox Code Playgroud)

而且parsefunc样子:

def parsefunc(stream):
  while not eof(stream):
    try:
       rec = read_record(stream)
       do_some_stuff()
       yield rec
    except Exception, e:
       properly_skip_record_or_prepare_retry()
       yield e
Run Code Online (Sandbox Code Playgroud)

为了让忘记适配器变得更加困难,我们还可以在parsefunc上将冻结从函数更改为装饰器.

def frozen_results(func):
  def freezer(__func = func, *args, **kw):
    for item in __func(*args, **kw):
       yield Frozen(item)
  return freezer
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我们将声明:

@frozen_results
def parsefunc(stream):
  ...
Run Code Online (Sandbox Code Playgroud)

我们显然不会费心声明frozen,或将其包裹在调用之中parsefunc.


sen*_*rle 7

在不了解系统的情况下,我认为很难说哪种方法最有效.但是,没有人建议的一个选项是使用回调.鉴于只read知道如何处理异常,这样的工作可能吗?

def read(stream, parsefunc):
    some_closure_data = {}

    def error_callback_1(e):
        manipulate(some_closure_data, e)
    def error_callback_2(e):
        transform(some_closure_data, e)

    for record in parsefunc(stream, error_callback_1):
        do_stuff(record)
Run Code Online (Sandbox Code Playgroud)

然后,在parsefunc:

def parsefunc(stream, error_callback):
    while not eof(stream):
        try:
            rec = read_record()
            yield rec
        except Exception as e:
            error_callback(e)
Run Code Online (Sandbox Code Playgroud)

我在这里使用了一个可变的本地闭包; 你也可以定义一个类.另请注意,您可以traceback通过sys.exc_info()回调内部访问信息.

另一个有趣的方法可能是使用send.这会有所不同; 基本上,不是定义一个回调,而是read可以检查结果yield,做很多复杂的逻辑,以及send一个替代值,然后生成器会重新产生(或做其他事情).这有点异国情调,但我想如果它有用的话我会提到它:

>>> def parsefunc(it):
...     default = None
...     for x in it:
...         try:
...             rec = float(x)
...         except ValueError as e:
...             default = yield e
...             yield default
...         else:
...             yield rec
... 
>>> parsed_values = parsefunc(['4', '6', '5', '5h', '22', '7'])
>>> for x in parsed_values:
...     if isinstance(x, ValueError):
...         x = parsed_values.send(0.0)
...     print x
... 
4.0
6.0
5.0
0.0
22.0
7.0
Run Code Online (Sandbox Code Playgroud)

在它自己这有点无用("为什么不直接打印默认值read?"你可能会问),但你可以default在生成器内部执行更复杂的操作,重置值,返回步骤等等.你甚至可以等待在发送回调基础上您会收到错误点.但请注意,sys.exc_info()一旦生成器被清除yield,所以sys.exc_info()如果需要访问回溯,则必须发送所有内容.

以下是如何组合这两个选项的示例:

import string
digits = set(string.digits)

def digits_only(v):
    return ''.join(c for c in v if c in digits)

def parsefunc(it):
    default = None
    for x in it:
        try:
            rec = float(x)
        except ValueError as e:
            callback = yield e
            yield float(callback(x))
        else:
            yield rec

parsed_values = parsefunc(['4', '6', '5', '5h', '22', '7'])
for x in parsed_values:
    if isinstance(x, ValueError):
        x = parsed_values.send(digits_only)
    print x
Run Code Online (Sandbox Code Playgroud)