如何在不缓冲的情况下使用单个枚举检查 IEnumerable 的多个条件?

The*_*ias 3 c# linq ienumerable

我有一个很长的数据序列,其形式为IEnumerable,我想检查它的一些条件。每个条件返回一个 true 或 false 值,我想知道是否所有条件都为 true。我的问题是我无法IEnumerable通过调用来实现ToList,因为它太长了(> 10,000,000,000 个元素)。我也无法多次枚举序列,每个条件一次,因为每次我都会得到不同的序列。我正在寻找一种有效的方法来执行此检查,如果可能的话,使用现有的 LINQ 功能。


澄清:我要求的是一个通用的解决方案,而不是下面给出的特定示例问题的解决方案。


这是我的序列的虚拟版本:

static IEnumerable<int> GetLongSequence()
{
    var random = new Random();
    for (long i = 0; i < 10_000_000_000; i++) yield return random.Next(0, 100_000_000);
}
Run Code Online (Sandbox Code Playgroud)

以下是序列必须满足的条件的示例:

var source = GetLongSequence();
var result = source.Any(n => n % 28_413_803 == 0)
    && source.All(n => n < 99_999_999)
    && source.Average(n => n) > 50_000_001;
Run Code Online (Sandbox Code Playgroud)

不幸的是,这种方法调用了三倍GetLongSequence,因此它不能满足问题的要求。

我尝试编写上面的Linqy扩展方法,希望这能给我一些想法:

public static bool AllConditions<TSource>(this IEnumerable<TSource> source,
    params Func<IEnumerable<TSource>, bool>[] conditions)
{
    foreach (var condition in conditions)
    {
        if (!condition(source)) return false;
    }
    return true;
}
Run Code Online (Sandbox Code Playgroud)

这就是我打算使用它的方式:

var result = source.AllConditions
(
    s => s.Any(n => n % 28_413_803 == 0),
    s => s.All(n => n < 99_999_999),
    s => s.Average(n => n) > 50_000_001,
    // more conditions...
);
Run Code Online (Sandbox Code Playgroud)

不幸的是,这并没有提供任何改进。再次GetLongSequence调用 3 次。

在我的头撞在墙上一个小时后,没有任何进展,我想出了一个可能的解决方案。我可以在单独的线程中运行每个条件,并将它们的访问同步到序列的单个共享枚举器。所以我最终得到了这个怪物:

public static bool AllConditions<TSource>(this IEnumerable<TSource> source,
    params Func<IEnumerable<TSource>, bool>[] conditions)
{
    var locker = new object();
    var enumerator = source.GetEnumerator();
    var barrier = new Barrier(conditions.Length);
    long index = -1;
    bool finished = false;

    IEnumerable<TSource> OneByOne()
    {
        try
        {
            while (true)
            {
                TSource current;
                lock (locker)
                {
                    if (finished) break;
                    if (barrier.CurrentPhaseNumber > index)
                    {
                        index = barrier.CurrentPhaseNumber;
                        finished = !enumerator.MoveNext();
                        if (finished)
                        {
                            enumerator.Dispose(); break;
                        }
                    }
                    current = enumerator.Current;
                }
                yield return current;
                barrier.SignalAndWait();
            }
        }
        finally
        {
            barrier.RemoveParticipant();
        }
    }

    var results = new ConcurrentQueue<bool>();
    var threads = conditions.Select(condition => new Thread(() =>
    {
        var result = condition(OneByOne());
        results.Enqueue(result);
    })
    { IsBackground = true }).ToArray();
    foreach (var thread in threads) thread.Start();
    foreach (var thread in threads) thread.Join();
    return results.All(r => r);
}
Run Code Online (Sandbox Code Playgroud)

对于同步 a 使用了Barrier. 这个解决方案实际上比我想象的要好得多。在我的机器上,它每秒可以处理近 1,000,000 个元素。但它还不够快,因为它需要将近 3 个小时才能处理 10,000,000,000 个元素的完整序列。我等待结果的时间不能超过 5 分钟。关于如何在单个线程中有效运行这些条件有什么想法吗?

Kla*_*ter 5

如果您需要确保序列仅枚举一次,则对整个序列进行操作的条件没有用。我想到的一种可能性是拥有一个为序列中的每个元素调用的接口,并根据您的具体条件以不同的方式实现该接口:

bool Example()
{
    var source = GetLongSequence();

    var conditions = new List<IEvaluate<int>>
    {
        new Any<int>(n => n % 28_413_803 == 0),
        new All<int>(n => n < 99_999_999),
        new Average(d => d > 50_000_001)
    };

    foreach (var item in source)
    {
        foreach (var condition in conditions)
        {
            condition.Evaluate(item);
        }
    }

    return conditions.All(c => c.Result);   
}

static IEnumerable<int> GetLongSequence()
{
    var random = new Random();
    for (long i = 0; i < 10_000_000_000; i++) yield return random.Next(0, 100_000_000);
}

interface IEvaluate<T>
{
    void Evaluate(T item);
    bool Result { get; }
}

class Any<T> : IEvaluate<T>
{
    private bool _result;
    private readonly Func<T, bool> _predicate;

    public Any(Func<T, bool> predicate)
    {
        _predicate = predicate;
        _result = false;
    }

    public void Evaluate(T item)
    {
        if (_predicate(item))
        {
            _result = true;
        }
    }

    public bool Result => _result;
}


class All<T> : IEvaluate<T>
{
    private bool _result;
    private readonly Func<T, bool> _predicate;

    public All(Func<T, bool> predicate)
    {
        _predicate = predicate;
        _result = true;
    }

    public void Evaluate(T item)
    {
        if (!_predicate(item))
        {
            _result = false;
        }
    }

    public bool Result => _result;
}

class Average : IEvaluate<int>
{
    private long _sum;
    private int _count;
    Func<double, bool> _evaluate;
    public Average(Func<double, bool> evaluate)
    {
    }

    public void Evaluate(int item)
    {
        _sum += item;
        _count++;
    }

    public bool Result => _evaluate((double)_sum / _count);
}
Run Code Online (Sandbox Code Playgroud)