标签: concurrent-queue

具有并发工作者的 RxJS 并行队列?

假设我想下载 10,000 个文件。我可以轻松地构建一个包含这 10,000 个文件的队列(如果其中任何一个可以做得更好,我很乐意接受建议),

import request from 'request-promise-native';
import {from} from 'rxjs';

let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
  reqs.push(
    from(request(`http://bleh.com/${i}`))
  )
};
Run Code Online (Sandbox Code Playgroud)

现在我有一个 Rx.JS observable 数组,我是从代表我的队列的承诺中创建的。现在对于我想要的行为,我想发出

  • 三并发请求到服务器
  • 完成请求后,我希望触发新的请求。

我可以为这个问题创建一个解决方案,但鉴于我从未使用过的Rxjs queue 之类的东西,我想知道最正确的 Rxjs 方法是什么。

queue parallel-processing rxjs concurrent-queue

3
推荐指数
1
解决办法
1268
查看次数

C# 并发队列使用

有一个简短的问题。

如果一个线程正在排队而另一个线程正在出队,我是否必须使用并发队列?在这种情况下使用常规容器(1 个读取器和 1 个写入器)时是否存在任何竞争条件/其他风险?

c# concurrency thread-safety race-condition concurrent-queue

3
推荐指数
1
解决办法
4844
查看次数

ConcurrentQueue <byte []>中的所有项都相同

我有一个NetworkStream用于从另一个程序获取数据.数据以Byte [64]的形式到达,然后我将其排入ConcurrentQueue,以便其他线程可以在以后出列以进行分析.队列被实例化:

ConcurrentQueue<byte[]> fifo = new ConcurrentQueue<byte[]>();
Run Code Online (Sandbox Code Playgroud)

然后我排队发送的所有数据:

Byte[] bytesIn = new Byte[64];
int i;
while ((i = stream.Read(bytesIn, 0, bytesIn.Length)) != 0)
{
    fifo.Enqueue(bytesIn);
}
Run Code Online (Sandbox Code Playgroud)

如果我然后查看(在调试期间),其中的数据fifo结果表明包含的每个字节[64]与最新的相同bytesIn.我如何确保我添加的数组fifo是值而不是指针(如果这是正确的术语)?

c# concurrent-queue

2
推荐指数
1
解决办法
738
查看次数

将 List&lt;T&gt; 内容添加到 ConcurrentQueue&lt;T&gt; 的最快方法

我正在阅读这个问题并注意到 OP 正在迭代一个列表以将项目排队到 ConcurrentQueue 中。

ConcurrentQueue<TaskClass> cq = new ConcurrentQueue<TaskClass>();
for (int x = 0; x < TaskList.Count; x++)
    cq.Enqueue(TaskList[x]);
Run Code Online (Sandbox Code Playgroud)

这是必要的吗?

有没有办法:

  • 将大量对象添加到 ConcurrentQueue,或
  • 只需将类型列表转换/转换为 ConcurrentQueue

c# concurrent-queue

2
推荐指数
1
解决办法
2967
查看次数

ConcurrentQueue和Parallel.ForEach

我有一个ConcurrentQueue,其中包含需要获取其来源的URL列表。当将Parallel.ForEach与ConcurrentQueue对象用作输入参数时,Pop方法将无效(应返回一个字符串)。

我使用MaxDegreeOfParallelism设置为四个的Parallel。我真的需要阻止并发线程的数量。使用具有并行性的队列是否多余?

提前致谢。

// On the main class
var items = await engine.FetchPageWithNumberItems(result);
// Enqueue List of items
itemQueue.EnqueueList(items);
var crawl = Task.Run(() => { engine.CrawlItems(itemQueue); });

// On the Engine class
public void CrawlItems(ItemQueue itemQueue)
{
Parallel.ForEach(
            itemQueue,
            new ParallelOptions {MaxDegreeOfParallelism = 4},
            item =>
            {

                var worker = new Worker();
                // Pop doesn't return anything
                worker.Url = itemQueue.Pop();
                /* Some work */
             });
 }

// Item Queue
class ItemQueue : ConcurrentQueue<string>
    {
        private ConcurrentQueue<string> queue = new ConcurrentQueue<string>(); …
Run Code Online (Sandbox Code Playgroud)

c# parallel.foreach concurrent-queue

2
推荐指数
1
解决办法
4530
查看次数

具有多线程的ConcurrentQueue

我是多线程概念的新手.我需要将一定数量的字符串添加到队列中并使用多个线程处理它们.使用ConcurrentQueue哪个是线程安全的.

这就是我尝试过的.但是不会处理添加到并发队列中的所有项目.只处理前4个项目.

class Program
{
    ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
    static void Main(string[] args)
    {
        new Program().run();
    }

    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];

        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }

        for (int i = 0; i < 100; i++)
        {
            iQ.Enqueue("Item" + i);
        }

        Task.WaitAll(workers);
        Console.WriteLine("Done.");

        Console.ReadLine();
    }

    void worker(int workerId)
    { …
Run Code Online (Sandbox Code Playgroud)

c# multithreading concurrent-queue

1
推荐指数
1
解决办法
5098
查看次数

如何在 ConcurrentQueue 出队时设置最大并发线程数?

我有一个线程负责入队,一个线程负责出队。然而,数据入队的频率远远超过出队+处理数据所需的时间。当我执行以下操作时,数据处理出现了巨大的延迟:

public void HandleData()
{
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                ProcessData(item);
            }
            else
            {
                Thread.Sleep(10);
            }
        }
        catch (Exception e)
        {
            //...
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

接下来,我尝试在单独的任务中处理数据,但这最终会影响项目中的其他任务,因为这种处理最终会占用分配给应用程序的大部分资源并生成大量线程。

public void HandleData()
{
    while (true)
    {
        try
        {
            if (Queue.Count > 0)
            {
                Queue.TryDequeue(out item);
                Task.Run(() => ProcessData(item));
            }
            else
            {
                Thread.Sleep(10);
            }
        }
        catch (Exception e)
        {
            //
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

接下来,我尝试了以下操作:

public void HandleData()
{
    List<Task> taskList = new List<Task>();
    while (true)
    {
        try …
Run Code Online (Sandbox Code Playgroud)

c# multithreading throttling task-parallel-library concurrent-queue

1
推荐指数
1
解决办法
607
查看次数