我的应用程序中有一个有趣的死锁问题.有一个内存数据存储,它使用ReaderWriterLockSlim来同步读写.其中一种读取方法使用Parallel.ForEach在给定一组过滤器的情况下搜索商店.其中一个过滤器可能需要对同一商店进行恒定时间读取.这是产生死锁的场景:
更新:下面的示例代码.实际方法调用更新步骤
鉴于单一实例store的ConcreteStoreThatExtendsGenericStore
store.Search(someCriteria)store.Update()- 阻塞Thread1理想情况下,如果当前线程的祖先线程已经具有相同的锁,我想要做的就是不尝试获取读锁.有没有办法做到这一点?或者还有另一种/更好的方法吗?
public abstract class GenericStore<TKey, TValue>
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private List<IFilter> _filters; //contains instance of ExampleOffendingFilter
protected Dictionary<TKey, TValue> Store { get; private set; }
public void Update()
{
_lock.EnterWriterLock();
//update the store
_lock.ExitWriteLock();
}
public TValue GetByKey(TKey …Run Code Online (Sandbox Code Playgroud) 我有一堆数据行,我想使用Parallel.ForEach来计算每一行的某些值,就像这样......
class DataRow
{
public double A { get; internal set; }
public double B { get; internal set; }
public double C { get; internal set; }
public DataRow()
{
A = double.NaN;
B = double.NaN;
C = double.NaN;
}
}
class Program
{
static void ParallelForEachToyExample()
{
var rnd = new Random();
var df = new List<DataRow>();
for (int i = 0; i < 10000000; i++)
{
var dr = new DataRow {A = rnd.NextDouble()};
df.Add(dr);
}
// …Run Code Online (Sandbox Code Playgroud) 我需要从网站上抓取数据。我有 1,000 多个链接需要访问,以前我将每个线程划分为 10 个链接,然后开始 100 个线程,每个线程拉取 10 个。经过几个测试用例后,100 个线程是最好的计数,以最大限度地减少它检索内容的时间所有的链接。
我意识到 .NET 4.0 为开箱即用的多线程提供了更好的支持,但这是根据您拥有的内核数量来完成的,在我的情况下,这不会产生足够的线程。我想我要问的是:优化 1,000 个链接拉动的最佳方法是什么。我应该使用.ForEach并让Parallel扩展控制产生的线程数量,还是找到一种方法来告诉它启动和划分工作的线程数?
我以前没有工作过,Parallel所以也许我的方法可能是错误的。
我正在尝试使用 C# 应用程序尽可能快地处理数字。我使用 aThread.Sleep()来模拟处理和随机数。我使用 3 种不同的技术。
这是我使用的测试代码:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
internal class Program
{
private static void Main()
{
var data = new int[500000];
var random = new Random();
for (int i = 0; i < 500000; i++)
{
data[i] = random.Next();
}
var partialTimes = new Dictionary<int, double>();
var iterations = 5;
for (int i = 1; i < iterations + 1; i++) …Run Code Online (Sandbox Code Playgroud) 我正在阅读这篇关于Parallel.ForEach“Parallel.ForEach 与传入异步方法不兼容”的文章。
所以,为了检查我写了这段代码:
static async Task Main(string[] args)
{
var results = new ConcurrentDictionary<string, int>();
Parallel.ForEach(Enumerable.Range(0, 100), async index =>
{
var res = await DoAsyncJob(index);
results.TryAdd(index.ToString(), res);
});
Console.ReadLine();
}
static async Task<int> DoAsyncJob(int i)
{
Thread.Sleep(100);
return await Task.FromResult(i * 10);
}
Run Code Online (Sandbox Code Playgroud)
此代码同时填充results字典。
顺便说一下,我创建了一个类型的字典,ConcurrentDictionary<string, int>因为万一我ConcurrentDictionary<int, int>在调试模式下探索它的元素时,我看到元素是按键排序的,我认为因此添加了 elenents。
所以,我想知道我的代码是否有效?如果它“与传入异步方法不兼容”,为什么它运行良好?
我一直在Parallel.ForEach对项目集合进行一些耗时的处理。该处理实际上是由外部命令行工具处理的,我无法更改它。然而,似乎Parallel.ForEach会“卡在”集合中长期运行的项目上。我已经将问题提炼出来,并且可以表明Parallel.ForEach,事实上,等待这个漫长的过程完成并且不允许任何其他人通过。我编写了一个控制台应用程序来演示该问题:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace testParallel
{
class Program
{
static int inloop = 0;
static int completed = 0;
static void Main(string[] args)
{
// initialize an array integers to hold the wait duration (in milliseconds)
var items = Enumerable.Repeat(10, 1000).ToArray();
// set one of the items to 10 seconds
items[50] = 10000;
// Initialize our line for reporting status
Console.Write(0.ToString("000") + " Threads, " + …Run Code Online (Sandbox Code Playgroud) c# parallel-processing blocking task-parallel-library parallel.foreach
我已阅读以下密切相关的主题,但我想问一个更具体的事情。
如果我们需要异步运行任务/方法,并且这些任务本身运行其他任务/等待其他任务,则首选哪种变体 - Parallel.ForEach,或Task.WhenAll?我将用下面的一些代码进行演示:
public async Task SomeWorker(string param1, HttpClient client,
List<FillMeUp> emptyCollection)
{
HttpRequestMessage message = new HttpRequestMessage();
message.Method = HttpMethod.Get;
message.Headers.Add("someParam", param1);
message.RequestUri = new Uri("https://www.somesite.me");
var requestResponse = await client.SendAsync(message).ConfigureAwait(false);
var content = await requestResponse.Content.ReadAsStringAsync()
.ConfigureAwait(false);
emptyCollection.Add(new FillMeUp()
{
Param1 = param1
});
}
Run Code Online (Sandbox Code Playgroud)
与 一起使用WhenAll:
using (HttpClient client = new HttpClient())
{
client.DefaultRequestHeaders.Add("Accept", "application/json");
List<FullCollection> fullCollection = GetMyFullCollection();
List<FillMeUp> emptyCollection = new List<FillMeUp>();
List<Task> workers = new List<Task>();
for …Run Code Online (Sandbox Code Playgroud) 我有一个 Parallel.ForEach 循环,它循环遍历一个集合。在内部,我进行了多次网络 I/O 调用的循环。我使用了 Task.ContinueWith 并嵌套了后续的 async-await 调用。处理的顺序无关紧要,但每个异步调用的数据都应该以同步方式处理。含义 - 对于每次迭代,从第一个异步调用中检索到的数据应该传递给第二个异步调用。在第二个异步调用完成后,来自两个异步调用的数据应该一起处理。
Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));
//this is my first async call
await countryTask.ContinueWith((countryData) =>
{
countries.Add(countryData.Result);
Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));
//based on the data I receive in 'stateTask', I make another async call
stateTask.ContinueWith((stateData) =>
{
states.Add(stateData.Result);
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for …Run Code Online (Sandbox Code Playgroud) c# asynchronous task-parallel-library async-await parallel.foreach
有人可以解释一下这段代码的行为:
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello world");
try
{
Parallel.ForEach(new[] { 1, 2, 3 }, async i =>
{
await Task.Delay(TimeSpan.FromSeconds(3));
throw new TaskCanceledException();
});
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
Console.WriteLine("Goodbye cruel world");
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud)
怎么可能,异常在主线程中从“try”块中弹出,应用程序崩溃了。我知道并行异步的最佳方法是“Task.WhenAll”。问题的目的是了解行为。
我有 10 个非常大的 CSV 文件(可能有也可能没有相同的标题),我正在使用“readr”包 read_csv_chunked() 连续读取和处理这些文件。目前,我可以使用 10 个内核并行读取 10 个文件。该过程仍需要一个小时。我有128个核心。我可以将每个 CSV 分成 10 个块,以便对每个文件并行处理,从而利用 100 个核心吗?这是我目前拥有的(创建两个示例文件仅用于测试):
library(doParallel)
library(foreach)
# Create a list of two sample CSV files and a filter by file
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)
write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)
files <- c("df_1.csv", "df_2.csv")
# Create a function to read and filter each file in chunks
my_function <-
function(file) { …Run Code Online (Sandbox Code Playgroud) parallel.foreach ×10
c# ×9
async-await ×3
asynchronous ×3
.net ×2
blocking ×1
concurrency ×1
csv ×1
deadlock ×1
doparallel ×1
exception ×1
partitioner ×1
r ×1
task ×1