标签: parallel.foreach

与ReaderWriterLockSlim并行的Parallel.ForEach死锁

我的应用程序中有一个有趣的死锁问题.有一个内存数据存储,它使用ReaderWriterLockSlim来同步读写.其中一种读取方法使用Parallel.ForEach在给定一组过滤器的情况下搜索商店.其中一个过滤器可能需要对同一商店进行恒定时间读取.这是产生死锁的场景:

更新:下面的示例代码.实际方法调用更新步骤
鉴于单一实例storeConcreteStoreThatExtendsGenericStore

  1. Thread1获取商店的读锁定 - store.Search(someCriteria)
  2. Thread2尝试使用写锁定来更新存储 - store.Update()- 阻塞Thread1
  3. Thread1对商店执行Parallel.ForEach以运行一组过滤器
  4. Thread3(由派生线程1的Parallel.ForEach)尝试恒定时读取的商店.它尝试获取读锁定但在Thread2的写锁定后被阻止.
  5. Thread1无法完成,因为它无法加入Thread3. 线程2无法完成,因为它阻止后面线程1.

理想情况下,如果当前线程的祖先线程已经具有相同的锁,我想要做的就是尝试获取读锁.有没有办法做到这一点?或者还有另一种/更好的方法吗?

public abstract class GenericStore<TKey, TValue>
{
    private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
    private List<IFilter> _filters;  //contains instance of ExampleOffendingFilter

    protected Dictionary<TKey, TValue> Store { get; private set; }

    public void Update()
    {
        _lock.EnterWriterLock();
        //update the store
        _lock.ExitWriteLock();
    }

    public TValue GetByKey(TKey …
Run Code Online (Sandbox Code Playgroud)

c# deadlock readerwriterlockslim parallel.foreach

5
推荐指数
1
解决办法
1457
查看次数

多个Parallel.ForEach调用,MemoryBarrier?

我有一堆数据行,我想使用Parallel.ForEach来计算每一行的某些值,就像这样......

class DataRow
{
    public double A { get; internal set; }
    public double B { get; internal set; }
    public double C { get; internal set; }

    public DataRow()
    {
        A = double.NaN;
        B = double.NaN;
        C = double.NaN;
    }
}

class Program
{
    static void ParallelForEachToyExample()
    {
        var rnd = new Random();
        var df = new List<DataRow>();

        for (int i = 0; i < 10000000; i++)
        {
            var dr = new DataRow {A = rnd.NextDouble()};
            df.Add(dr);
        }

        // …
Run Code Online (Sandbox Code Playgroud)

c# multithreading task-parallel-library parallel.foreach

5
推荐指数
1
解决办法
1118
查看次数

Parallel.ForEach / 多线程的最佳使用

我需要从网站上抓取数据。我有 1,000 多个链接需要访问,以前我将每个线程划分为 10 个链接,然后开始 100 个线程,每个线程拉取 10 个。经过几个测试用例后,100 个线程是最好的计数,以最大限度地减少它检索内容的时间所有的链接。

我意识到 .NET 4.0 为开箱即用的多线程提供了更好的支持,但这是根据您拥有的内核数量来完成的,在我的情况下,这不会产生足够的线程。我想我要问的是:优化 1,000 个链接拉动的最佳方法是什么。我应该使用.ForEach并让Parallel扩展控制产生的线程数量,还是找到一种方法来告诉它启动和划分工作的线程数?

我以前没有工作过,Parallel所以也许我的方法可能是错误的。

.net c# multithreading parallel.foreach

4
推荐指数
2
解决办法
1万
查看次数

为什么 C# 中第一次调用的并行处理速度要慢得多?

我正在尝试使用 C# 应用程序尽可能快地处理数字。我使用 aThread.Sleep()来模拟处理和随机数。我使用 3 种不同的技术。

这是我使用的测试代码:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    internal class Program
    {
        private static void Main()
        {
            var data = new int[500000];
            var random = new Random();

            for (int i = 0; i < 500000; i++)
            {
                data[i] = random.Next();
            }

            var partialTimes = new Dictionary<int, double>();
            var iterations = 5;

            for (int i = 1; i < iterations + 1; i++) …
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing task parallel.foreach partitioner

4
推荐指数
1
解决办法
712
查看次数

将异步方法传递到 Parallel.ForEach

我正在阅读这篇关于Parallel.ForEach“Parallel.ForEach 与传入异步方法不兼容”的文章。

所以,为了检查我写了这段代码:

static async Task Main(string[] args)
{
    var results = new ConcurrentDictionary<string, int>();

    Parallel.ForEach(Enumerable.Range(0, 100), async index =>
    {
        var res = await DoAsyncJob(index);
        results.TryAdd(index.ToString(), res);
    });         

    Console.ReadLine();
}

static async Task<int> DoAsyncJob(int i)
{
    Thread.Sleep(100);
    return await Task.FromResult(i * 10);
}
Run Code Online (Sandbox Code Playgroud)

此代码同时填充results字典。

顺便说一下,我创建了一个类型的字典,ConcurrentDictionary<string, int>因为万一我ConcurrentDictionary<int, int>在调试模式下探索它的元素时,我看到元素是按键排序的,我认为因此添加了 elenents。

所以,我想知道我的代码是否有效?如果它“与传入异步方法不兼容”,为什么它运行良好?

c# async-await parallel.foreach

4
推荐指数
1
解决办法
7609
查看次数

Parallel.ForEach 在长迭代中被阻塞

我一直在Parallel.ForEach对项目集合进行一些耗时的处理。该处理实际上是由外部命令行工具处理的,我无法更改它。然而,似乎Parallel.ForEach会“卡在”集合中长期运行的项目上。我已经将问题提炼出来,并且可以表明Parallel.ForEach,事实上,等待这个漫长的过程完成并且不允许任何其他人通过。我编写了一个控制台应用程序来演示该问题:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace testParallel
{
    class Program
    {
        static int inloop = 0;
        static int completed = 0;
        static void Main(string[] args)
        {
            // initialize an array integers to hold the wait duration (in milliseconds)
            var items = Enumerable.Repeat(10, 1000).ToArray();
            
            // set one of the items to 10 seconds
            items[50] = 10000;


            // Initialize our line for reporting status
            Console.Write(0.ToString("000") + " Threads, " + …
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing blocking task-parallel-library parallel.foreach

4
推荐指数
1
解决办法
2917
查看次数

涉及异步操作时,Parallel.ForEach 或 Task.WhenAll ?

我已阅读以下密切相关的主题,但我想问一个更具体的事情。

如果我们需要异步运行任务/方法,并且这些任务本身运行其他任务/等待其他任务,则首选哪种变体 - Parallel.ForEach,或Task.WhenAll?我将用下面的一些代码进行演示:

public async Task SomeWorker(string param1, HttpClient client,
    List<FillMeUp> emptyCollection)
{
    HttpRequestMessage message = new HttpRequestMessage();
    message.Method = HttpMethod.Get;
    message.Headers.Add("someParam", param1);
    message.RequestUri = new Uri("https://www.somesite.me");
    var requestResponse = await client.SendAsync(message).ConfigureAwait(false);
    var content = await requestResponse.Content.ReadAsStringAsync()
        .ConfigureAwait(false);
    emptyCollection.Add(new FillMeUp()
    {
        Param1 = param1
    });
}
Run Code Online (Sandbox Code Playgroud)

与 一起使用WhenAll

using (HttpClient client = new HttpClient())
{
    client.DefaultRequestHeaders.Add("Accept", "application/json");

    List<FullCollection> fullCollection = GetMyFullCollection();
    List<FillMeUp> emptyCollection = new List<FillMeUp>();
    List<Task> workers = new List<Task>();
    for …
Run Code Online (Sandbox Code Playgroud)

c# concurrency asynchronous async-await parallel.foreach

4
推荐指数
1
解决办法
5154
查看次数

Parallel.ForEach 中的多个异步等待链接

我有一个 Parallel.ForEach 循环,它循环遍历一个集合。在内部,我进行了多次网络 I/O 调用的循环。我使用了 Task.ContinueWith 并嵌套了后续的 async-await 调用。处理的顺序无关紧要,但每个异步调用的数据都应该以同步方式处理。含义 - 对于每次迭代,从第一个异步调用中检索到的数据应该传递给第二个异步调用。在第二个异步调用完成后,来自两个异步调用的数据应该一起处理。

Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
    Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));

    //this is my first async call
    await countryTask.ContinueWith((countryData) =>
    {
        countries.Add(countryData.Result);

        Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));

        //based on the data I receive in 'stateTask', I make another async call
        stateTask.ContinueWith((stateData) =>
        {
            states.Add(stateData.Result);

            // use data from both the async calls pass it to below function for some calculation
            // in a synchronized way (for …
Run Code Online (Sandbox Code Playgroud)

c# asynchronous task-parallel-library async-await parallel.foreach

4
推荐指数
2
解决办法
864
查看次数

.net 和 Parallel.ForEach。异常绕过“try”块

有人可以解释一下这段代码的行为:

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Hello world");
        try
        {
            Parallel.ForEach(new[] { 1, 2, 3 }, async i =>
            {
                await Task.Delay(TimeSpan.FromSeconds(3));
                throw new TaskCanceledException();
            });
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
        }
        Console.WriteLine("Goodbye cruel world");
        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

怎么可能,异常在主线程中从“try”块中弹出,应用程序崩溃了。我知道并行异步的最佳方法是“Task.WhenAll”。问题的目的是了解行为。

.net c# asynchronous exception parallel.foreach

4
推荐指数
1
解决办法
502
查看次数

如何使用 R 并行读取多个大块的 CSV?

我有 10 个非常大的 CSV 文件(可能有也可能没有相同的标题),我正在使用“readr”包 read_csv_chunked() 连续读取和处理这些文件。目前,我可以使用 10 个内核并行读取 10 个文件。该过程仍需要一个小时。我有128个核心。我可以将每个 CSV 分成 10 个块,以便对每个文件并行处理,从而利用 100 个核心吗?这是我目前拥有的(创建两个示例文件仅用于测试):

library(doParallel)
library(foreach)

# Create a list of two sample CSV files and a filter by file
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)

write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)

files <- c("df_1.csv", "df_2.csv")

# Create a function to read and filter each file in chunks
my_function <-
  function(file) { …
Run Code Online (Sandbox Code Playgroud)

csv parallel-processing r parallel.foreach doparallel

4
推荐指数
1
解决办法
766
查看次数