我有 10 个非常大的 CSV 文件(可能有也可能没有相同的标题),我正在使用“readr”包 read_csv_chunked() 连续读取和处理这些文件。目前,我可以使用 10 个内核并行读取 10 个文件。该过程仍需要一个小时。我有128个核心。我可以将每个 CSV 分成 10 个块,以便对每个文件并行处理,从而利用 100 个核心吗?这是我目前拥有的(创建两个示例文件仅用于测试):
library(doParallel)
library(foreach)
# Create a list of two sample CSV files and a filter by file
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)
write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)
files <- c("df_1.csv", "df_2.csv")
# Create a function to read and filter each file in chunks
my_function <-
function(file) { …Run Code Online (Sandbox Code Playgroud) 我cnt在嵌套中有一个变量()parallel.foreach.我运行这个程序,显然它运作良好.有谁知道这段代码真的是线程安全的吗?是否可以在内部parallel循环中定义变量?
谢谢.
object obj = new object();
int total=0;
Parallel.For(0, 2, i =>
{
Parallel.For(0, 1000000, j =>
{
int cnt = 0;
if ((arr[i, j] % 2) == 0)
{
Interlocked.Increment(ref cnt);
}
lock (obj)
{
total= total+ (cnt / 2);
}
});
});
Run Code Online (Sandbox Code Playgroud) 我有一个巨大的网页列表,显示一个状态,我需要检查.一些网址位于同一网站内,另一个网址位于另一个网站上.
现在我正试图通过使用下面的代码以并行的方式做到这一点,但我觉得我造成了太多的开销.
while(ListOfUrls.Count > 0){
Parallel.ForEach(ListOfUrls, url =>
{
WebClient webClient = new WebClient();
webClient.DownloadString(url);
... run my checks here..
});
ListOfUrls = GetNewUrls.....
}
Run Code Online (Sandbox Code Playgroud)
这可以用更少的开销完成,并且可以更多地控制我使用/重用的Web客户端和连接数量吗?那么,最终工作可以更快完成吗?
c# parallel-processing webclient downloadstring parallel.foreach
我有这段代码:
int totalData = result.Data.Count;
int count = 0;
Parallel.ForEach(result.Data, data =>
{
try
{
EventRange importedEntity = ImportEntity(auxResult.EntityName, data);
count++;
EntityImported(importedEntity, count, totalData);
}
catch (Exception e)
{
exceptions.Enqueue(e);
}
});
Run Code Online (Sandbox Code Playgroud)
EntityImported是一个事件,应该说已经处理了多少实体,以及我应该处理多少实体.我担心的是增加lambda内部计数的线程安全性,以及您建议采用哪些步骤来确保始终使用count变量的正确值触发事件.
c# multithreading thread-safety task-parallel-library parallel.foreach
在我的应用程序中,我希望我的主线程不被其他任何东西使用.我必须做一些并行处理,我想通过不同的线程来完成.为此,我使用Parallel.For如下
static void SomeMethod()
{
Console.WriteLine(string.Format("Main Thread ID before parallel loop ->>>>>>> {0} ", System.Threading.Thread.CurrentThread.ManagedThreadId));
Parallel.For(0, 10, i =>
{
Console.WriteLine(string.Format("Output ->>>>>>> {0} ", System.Threading.Thread.CurrentThread.ManagedThreadId));
});
Thread.Sleep(100);
Console.WriteLine(string.Format("Main Thread ID after parallel loop ->>>>>>> {0} ", System.Threading.Thread.CurrentThread.ManagedThreadId));
}
Run Code Online (Sandbox Code Playgroud)
从输出主线程可以看出使用ThreadID 1和Parallel.For中的一些线程也使用相同的线程.
Main Thread ID before parallel loop ->>>>>>> 1
Output ->>>>>>> 1
Output ->>>>>>> 1
Output ->>>>>>> 3
Output ->>>>>>> 4
Output ->>>>>>> 4
Output ->>>>>>> 4
Output ->>>>>>> 4
Output ->>>>>>> 5
Output ->>>>>>> 3
Output ->>>>>>> 1
Main Thread …Run Code Online (Sandbox Code Playgroud) 我尝试验证我的图片网址,看看它们是否有效.我有这么多人,完成这项任务需要几个小时.因此,我决定异步进行.我想知道我的代码是否有任何重大差异或优势,如下所示.
我的主要职能是:
Async Function testUrl_async(ByVal myImageurl As String) As Task(Of Boolean)
myHttpResponse = Await myHttpClient.GetAsync(myImageurl)
If myHttpResponse.IsSuccessStatusCode Then
mySuccess = True
Else
mySuccess = False
End If
Return mySuccess
End Function
Function testUrl(ByVal myImageurl As String) As Boolean
myHttpResponse = myHttpClient.GetAsync(myImageurl)
If myHttpResponse.IsSuccessStatusCode Then
mySuccess = True
Else
mySuccess = False
End If
Return mySuccess
End Function
Run Code Online (Sandbox Code Playgroud)
1)使用异步等待.
For Each myImage In myImages
Dim result=await testUrl_async(myImageUrl).Result
'some code
Next
Run Code Online (Sandbox Code Playgroud)
2)使用平行foreach
Parallel.ForEach(myImages,
Sub(myImage)
testUrl(pictureComponent.websiteShop.hqpatronen, myImageUrl)
'some code
End Sub)
Run Code Online (Sandbox Code Playgroud)
3)使用并行foreach和asnyc/await
Parallel.ForEach(myImages, …Run Code Online (Sandbox Code Playgroud) 我理解,在Parallel.ForEach循环中,每个线程可能在任何给定时间执行循环的不同部分.但是,每个线程是否按顺序执行循环中的代码?我只是在MSDN上阅读" Parallel Loops ",它说:
有时,两个步骤的顺序与循环顺序时相反.唯一的保证是所有循环的迭代都将在循环结束时运行.
说我有以下内容:
IEnumerable<MyObject> myEnumerable = ...
Parallel.ForEach(myEnumerable, obj =>
{
A();
B();
C();
});
Run Code Online (Sandbox Code Playgroud)
我知道线程1可能正在执行A()线程2 C(),但是每个线程将在循环中顺序执行代码.是否线程1做A() B() C()或可能它可能做B(),C(),A()?
我已经看到了基本类型的并行聚合代码,例如
Parallel.For<int>(0, result.Count, () => 0, (i, loop, subtotal) =>
{
subtotal += result[i];
return subtotal;
},
(x) => Interlocked.Add(ref sum, x)
);
Run Code Online (Sandbox Code Playgroud)
我想知道是否有相同的列表/其他集合,例如:
List<Result> AllResults;
Parallel.ForEach(allIDs, (currentID) =>
{
subList.add(GetResultFor(currentID));
return subList;
},
(x) =>
{
lock(AllResults)
AllResults.AddRange(subList);
};
Run Code Online (Sandbox Code Playgroud)
我猜测没有什么好看和整洁,但我想不出另一种做法,当然不是通过一个标准的parralel.ForEach因为我不能想到你会怎么说"这个核心有这个范围,这个核心这个范围"....
我在这里有问题.我试图使用Parallel.foreach将我的数据表转换为列表对象.像这样 .
public List<ProductList> GetProductList(DataTable table)
{
List<ProductList> list = new List<ProductList>();
Parallel.ForEach(table.AsEnumerable(), item =>
{
string sku = item["product_sku"].ToString();
//int skus = Convert.ToInt16(item["product_sku"]);
string price = item["product_price"].ToString();
string tweakerID = item["ID"].ToString();
string finalPrice = item["FinalPrice"].ToString();
list.Add(new ProductList() { SKU = sku, Price = price, ID = id, FinalPrice = finalPrice });
});
list.RemoveAll(item => item == null);
return list;
}
Run Code Online (Sandbox Code Playgroud)
我有超过65000个产品行.在此之后.列表中只添加了约63000种产品.但结果不是修正号码.例如,我运行此代码的最后三次我有63202,64025,62920.每次都是一个新号码.
我也不例外.
谷歌没有帮助我,所以也没有.
var timer = new System.Timers.Timer(5000);
timer.Elapsed += BreakEvent;
timer.Enabled = true;
Parallel.ForEach<string>(fileNames, (fileName, state) =>
{
try
{
ProcessFile(fileName);
}
catch (Exception)
{
}
finally
{
}
});
Run Code Online (Sandbox Code Playgroud)
我想ForEach在5秒(in BreakEvent)之后打破这个循环.
当然它可以是一个按钮或任何东西.
我知道破坏(在我的例子中)
state.Stop();
Run Code Online (Sandbox Code Playgroud)
但它仍然在循环中.
它甚至可能吗?
编辑:
对于那些搜索到其他方式的人来说,我就是这样的:
var timer = new System.Timers.Timer(5000);
timer.Elapsed += new System.Timers.ElapsedEventHandler((obj, args) =>
{
state.Stop();
});
timer.Enabled = true;
Run Code Online (Sandbox Code Playgroud) parallel.foreach ×10
c# ×8
aggregate ×1
async-await ×1
csv ×1
doparallel ×1
foreach ×1
r ×1
vb.net ×1
webclient ×1