如何同时入队和出队?

Stu*_*ana 1 .net c# queue nonblocking thread-safety

我有一个带有两个线程的简单场景,其中第一个线程永久读取一些数据并将该数据排入队列.第二个线程首先从该队列中查看单个对象并进行一些条件检查.如果这些是好的,单个对象将被出列并传递给某些处理.

我试图使用ConcurrentQueue这是一个简单队列的线程安全实现,但这个问题是所有调用都是阻塞.这意味着如果第一个线程将对象排入队列,则第二个线程无法查看或出列对象.

在我的情况下,我需要在最后排队并同时从队列的开头出队.

C#的锁定语句也会.

所以我的问题是,是否可以并行执行这两个操作而不会以线程安全的方式相互阻塞.

这些是我的第一次尝试,这是我的问题的类似示例.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Scenario {
    public class Program {
        public static void Main(string[] args) {
            Scenario scenario = new Scenario();
            scenario.Start();
            Console.ReadKey();
        }

        public class Scenario {
            public Scenario() {
                someData = new Queue<int>();
            }

            public void Start() {
                Task.Factory.StartNew(firstThread);
                Task.Factory.StartNew(secondThread);
            }

            private void firstThread() {
                Random random = new Random();
                while (true) {
                    int newData = random.Next(1, 100);
                    someData.Enqueue(newData);
                    Console.WriteLine("Enqueued " + newData);
                }
            }

            private void secondThread() {
                Random random = new Random();
                while (true) {
                    if (someData.Count == 0) {
                        continue;
                    }

                    int singleData = someData.Peek();

                    int someValue = random.Next(1, 100);
                    if (singleData > someValue || singleData == 1 || singleData == 99) {
                        singleData = someData.Dequeue();
                        Console.WriteLine("Dequeued " + singleData);
                        // ... processing ...
                    }
                }
            }

            private readonly Queue<int> someData;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

第二个例子:

public class Scenario {
    public Scenario() {
        someData = new ConcurrentQueue<int>();
    }

    public void Start() {
        Task.Factory.StartNew(firstThread);
        Task.Factory.StartNew(secondThread);
    }

    private void firstThread() {
        Random random = new Random();
        while (true) {
            int newData = random.Next(1, 100);
            someData.Enqueue(newData);
            lock (syncRoot) { Console.WriteLine($"Enqued {enqued++} Dequed {dequed}"); }
        }
    }

    private void secondThread() {
        Random random = new Random();
        while (true) {
            if (!someData.TryPeek(out int singleData)) {
                continue;
            }

            int someValue = random.Next(1, 100);
            if (singleData > someValue || singleData == 1 || singleData == 99) {
                if (!someData.TryDequeue(out singleData)) {
                    continue;
                }

                lock (syncRoot) { Console.WriteLine($"Enqued {enqued} Dequed {dequed++}"); }

                // ... processing ...
            }
        }
    }

    private int enqued = 0;
    private int dequed = 0;

    private readonly ConcurrentQueue<int> someData;

    private static readonly object syncRoot = new object();
}
Run Code Online (Sandbox Code Playgroud)

Eri*_*ert 7

首先:我强烈建议您重新考虑您的多线程技术和共享内存数据结构是否是正确的方法.具有多个控制共享访问数据结构线程的代码很难正确,故障可能是微妙的,灾难性的,并且难以调试.

第二:如果您倾向于使用多个线程和共享内存数据结构,我强烈建议您使用专家设计的数据类型(如并发队列),而不是自己编写.

现在我已经把这些警告排除在外了:这是解决问题的一种方法.它非常复杂,您应该获得C#内存模型专家的服务,以验证解决方案的正确性.我不认为自己有能力实施我即将描述的方案,而不是没有实际上是内存模型专家的人的帮助.

目标是拥有一个支持同时入队和出列操作以及低锁争用的队列.

你想要的是两个不可变的堆栈变量,称为enqueue堆栈和dequeue堆栈,每个变量都有自己的锁.

入队行动是:

  • 带上入队锁
  • 将项目推入排队堆栈; 这会在O(1)时间内产生一个新的堆栈.
  • 将新生成的堆栈分配给enqueue堆栈变量.
  • 释放入队锁

出列操作是:

  • 取出队列
  • 如果出列堆栈是空的那么
    • 采取入队锁
    • 枚举enqueue堆栈并使用它来构建dequeue堆栈; 这反转了enqueue堆栈,它保留了我们想要的属性:第一个是第一个输出.
    • 将一个空的不可变堆栈分配给enqueue堆栈变量
    • 释放入队锁
    • 将新堆栈分配给出列堆栈
  • 如果出列堆栈是空的,抛出,或放弃并稍后重试,或者睡眠直到入队操作发出信号,或者这里做的正确事情是什么.
  • 出列堆栈不为空.
  • 从dequeue堆栈中弹出一个项目,在O(1)中生成一个新堆栈.
  • 将新堆栈分配给出列堆栈变量.
  • 释放出队锁.
  • 处理该项目.

注意,当然如果只有一个线程出队,那么我们根本不需要出队锁,但是使用这个方案可以有很多线程出队.

假设在enqueue堆栈上有1000个项目,在dequeue堆栈上有零.当我们出队的第一次,我们做了一个昂贵的O(n)的倒车入列组的操作一次,但现在我们有离队堆栈1000个项目.一旦出列堆栈很大,出列线程就可以花费大部分时间进行处理,而入队线程则将大部分时间用于排队.对入队锁的争用很少,但发生时却很昂贵.

为什么要使用不可变数据结构?我在这里描述的所有内容也适用于可变堆栈,但是(1)更容易推理不可变堆栈,(2)如果你想真正生活危险,你可以忽略一些锁并进行互锁交换操作; 如果您这样做,请确保您了解在低锁定条件下可能重新排序操作的所有内容.

更新:

真正的问题是我不能出列并处理很多要点,因为我永远在阅读和征服新点.入队呼叫阻止了处理步骤.

好吧,如果这是你真正的问题,那么问题中提及它而不是将其隐藏在评论中将是一个好主意.帮助我们帮助您.

你可以在这里做很多事情.例如,您可以将排队线程的优先级设置为低于出队线程的优先级.或者你可以拥有多个出列线程,就像机器中有CPU一样多.或者,如果出列没有跟上,你可以动态地选择放弃一些入队操作.如果不了解更多关于实际问题的信息,很难就如何解决问题提出建议.

  • @Fede:这是一个很好的想法,尽管如果它是一个*持久*不可变队列,那么*它可能被实现为两个不可变堆栈*,所以您所做的就是解决问题。我描述这样的机制的目的是为了对以下事实提供一些见解:*几乎总是在某个地方存在昂贵的操作*;例如,可变队列使用双满算法,该算法偶尔也会产生 O(n) 步骤,并通过 O(n) 廉价步骤进行分摊。 (2认同)