标签: parallel.foreach

如何使用 R 并行读取多个大块的 CSV?

我有 10 个非常大的 CSV 文件(可能有也可能没有相同的标题),我正在使用“readr”包 read_csv_chunked() 连续读取和处理这些文件。目前,我可以使用 10 个内核并行读取 10 个文件。该过程仍需要一个小时。我有128个核心。我可以将每个 CSV 分成 10 个块,以便对每个文件并行处理,从而利用 100 个核心吗?这是我目前拥有的(创建两个示例文件仅用于测试):

library(doParallel)
library(foreach)

# Create a list of two sample CSV files and a filter by file
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)

write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)

files <- c("df_1.csv", "df_2.csv")

# Create a function to read and filter each file in chunks
my_function <-
  function(file) { …
Run Code Online (Sandbox Code Playgroud)

csv parallel-processing r parallel.foreach doparallel

4
推荐指数
1
解决办法
766
查看次数

并行循环内部的局部变量线程安全

cnt在嵌套中有一个变量()parallel.foreach.我运行这个程序,显然它运作良好.有谁知道这段代码真的是线程安全的吗?是否可以在内部parallel循环中定义变量?

谢谢.

    object obj = new object();
    int total=0;

    Parallel.For(0, 2, i =>
    {
        Parallel.For(0, 1000000, j =>
        {
            int cnt = 0;
            if ((arr[i, j] % 2) == 0)
            {
                Interlocked.Increment(ref cnt);
            }
            lock (obj)
            {
                total= total+ (cnt / 2);
            }
        });
    });
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing parallel.foreach

3
推荐指数
1
解决办法
6423
查看次数

C#从巨大的网址列表中下载数据

我有一个巨大的网页列表,显示一个状态,我需要检查.一些网址位于同一网站内,另一个网址位于另一个网站上.

现在我正试图通过使用下面的代码以并行的方式做到这一点,但我觉得我造成了太多的开销.

while(ListOfUrls.Count > 0){
  Parallel.ForEach(ListOfUrls, url =>
  {
    WebClient webClient = new WebClient();
    webClient.DownloadString(url);
    ... run my checks here.. 
  });

  ListOfUrls = GetNewUrls.....
}
Run Code Online (Sandbox Code Playgroud)

这可以用更少的开销完成,并且可以更多地控制我使用/重用的Web客户端和连接数量吗?那么,最终工作可以更快完成吗?

c# parallel-processing webclient downloadstring parallel.foreach

3
推荐指数
1
解决办法
5108
查看次数

在Parallel.For中修改方法局部变量.这个线程安全吗?

我有这段代码:

int totalData = result.Data.Count;
int count = 0;
Parallel.ForEach(result.Data, data =>
{
    try
    {
        EventRange importedEntity = ImportEntity(auxResult.EntityName, data);
        count++;
        EntityImported(importedEntity, count, totalData);
    }
    catch (Exception e)
    {
        exceptions.Enqueue(e);
    }
});
Run Code Online (Sandbox Code Playgroud)

EntityImported是一个事件,应该说已经处理了多少实体,以及我应该处理多少实体.我担心的是增加lambda内部计数的线程安全性,以及您建议采用哪些步骤来确保始终使用count变量的正确值触发事件.

c# multithreading thread-safety task-parallel-library parallel.foreach

3
推荐指数
1
解决办法
544
查看次数

Parallel.For不使用我的主线程

在我的应用程序中,我希望我的主线程不被其他任何东西使用.我必须做一些并行处理,我想通过不同的线程来完成.为此,我使用Parallel.For如下

static void SomeMethod()
{
    Console.WriteLine(string.Format("Main Thread ID  before parallel loop ->>>>>>> {0} ", System.Threading.Thread.CurrentThread.ManagedThreadId));
    Parallel.For(0, 10, i =>
    {
        Console.WriteLine(string.Format("Output ->>>>>>> {0} ", System.Threading.Thread.CurrentThread.ManagedThreadId));
    }); 
    Thread.Sleep(100);
    Console.WriteLine(string.Format("Main Thread ID  after parallel loop ->>>>>>> {0} ", System.Threading.Thread.CurrentThread.ManagedThreadId));
}
Run Code Online (Sandbox Code Playgroud)

从输出主线程可以看出使用ThreadID 1和Parallel.For中的一些线程也使用相同的线程.

Main Thread ID  before parallel loop ->>>>>>> 1
Output ->>>>>>> 1
Output ->>>>>>> 1
Output ->>>>>>> 3
Output ->>>>>>> 4
Output ->>>>>>> 4
Output ->>>>>>> 4
Output ->>>>>>> 4
Output ->>>>>>> 5
Output ->>>>>>> 3
Output ->>>>>>> 1
Main Thread …
Run Code Online (Sandbox Code Playgroud)

c# task-parallel-library parallel.foreach

3
推荐指数
1
解决办法
1777
查看次数

使用Parallel.ForEach和/或async/await

我尝试验证我的图片网址,看看它们是否有效.我有这么多人,完成这项任务需要几个小时.因此,我决定异步进行.我想知道我的代码是否有任何重大差异或优势,如下所示.

我的主要职能是:

Async Function testUrl_async(ByVal myImageurl As String) As Task(Of Boolean)

   myHttpResponse = Await myHttpClient.GetAsync(myImageurl)
    If myHttpResponse.IsSuccessStatusCode Then
        mySuccess = True
    Else
        mySuccess = False
    End If

    Return mySuccess
End Function

 Function testUrl(ByVal myImageurl As String) As  Boolean

   myHttpResponse = myHttpClient.GetAsync(myImageurl)
    If myHttpResponse.IsSuccessStatusCode Then
        mySuccess = True
    Else
        mySuccess = False
    End If

    Return mySuccess
End Function
Run Code Online (Sandbox Code Playgroud)

1)使用异步等待.

For Each myImage In myImages
    Dim result=await testUrl_async(myImageUrl).Result 
    'some code                  
Next
Run Code Online (Sandbox Code Playgroud)

2)使用平行foreach

Parallel.ForEach(myImages, 
    Sub(myImage)
        testUrl(pictureComponent.websiteShop.hqpatronen, myImageUrl) 
        'some code
    End Sub)
Run Code Online (Sandbox Code Playgroud)

3)使用并行foreach和asnyc/await

Parallel.ForEach(myImages, …
Run Code Online (Sandbox Code Playgroud)

vb.net task-parallel-library async-await parallel.foreach

3
推荐指数
1
解决办法
2217
查看次数

Parallel.ForEach行为

我理解,在Parallel.ForEach循环中,每个线程可能在任何给定时间执行循环的不同部分.但是,每个线程是否按顺序执行循环中的代码?我只是在MSDN上阅读" Parallel Loops ",它说:

有时,两个步骤的顺序与循环顺序时相反.唯一的保证是所有循环的迭代都将在循环结束时运行.

说我有以下内容:

IEnumerable<MyObject> myEnumerable = ...
Parallel.ForEach(myEnumerable, obj =>
{
     A();
     B();
     C();
});
Run Code Online (Sandbox Code Playgroud)

我知道线程1可能正在执行A()线程2 C(),但是每个线程将在循环中顺序执行代码.是否线程1做A() B() C()或可能它可能做B(),C(),A()

c# multithreading parallel.foreach

3
推荐指数
1
解决办法
499
查看次数

并行聚合集合

我已经看到了基本类型的并行聚合代码,例如

Parallel.For<int>(0, result.Count, () => 0, (i, loop, subtotal) =>
    {
        subtotal += result[i];
        return subtotal;
    },
    (x) => Interlocked.Add(ref sum, x)
);
Run Code Online (Sandbox Code Playgroud)

我想知道是否有相同的列表/其他集合,例如:

List<Result> AllResults;
Parallel.ForEach(allIDs, (currentID) =>
{

    subList.add(GetResultFor(currentID));
    return subList;
},
(x) =>
{
    lock(AllResults)
        AllResults.AddRange(subList);
};
Run Code Online (Sandbox Code Playgroud)

我猜测没有什么好看和整洁,但我想不出另一种做法,当然不是通过一个标准的parralel.ForEach因为我不能想到你会怎么说"这个核心有这个范围,这个核心这个范围"....

c# parallel-processing aggregate parallel.foreach

3
推荐指数
1
解决办法
1954
查看次数

Parallel.foreach不会处理所有项目

我在这里有问题.我试图使用Parallel.foreach将我的数据表转换为列表对象.像这样 .

public List<ProductList> GetProductList(DataTable table)
{
    List<ProductList> list = new List<ProductList>();
    Parallel.ForEach(table.AsEnumerable(), item =>
    {

            string sku = item["product_sku"].ToString();
            //int skus = Convert.ToInt16(item["product_sku"]);

            string price = item["product_price"].ToString();

            string tweakerID = item["ID"].ToString();
            string finalPrice = item["FinalPrice"].ToString();
            list.Add(new ProductList() { SKU = sku, Price = price, ID = id, FinalPrice = finalPrice });


    });





    list.RemoveAll(item => item == null);

    return list;
} 
Run Code Online (Sandbox Code Playgroud)

我有超过65000个产品行.在此之后.列表中只添加了约63000种产品.但结果不是修正号码.例如,我运行此代码的最后三次我有63202,64025,62920.每次都是一个新号码.

我也不例外.

c# foreach multithreading visual-studio parallel.foreach

3
推荐指数
1
解决办法
891
查看次数

从外面打破平行ForEach

谷歌没有帮助我,所以也没有.

var timer = new System.Timers.Timer(5000);
timer.Elapsed += BreakEvent;
timer.Enabled = true;

Parallel.ForEach<string>(fileNames, (fileName, state) =>
{
    try
    {
        ProcessFile(fileName);
    }
    catch (Exception)
    {

    }
    finally
    {

    }
});
Run Code Online (Sandbox Code Playgroud)

我想ForEach在5秒(in BreakEvent)之后打破这个循环.

当然它可以是一个按钮或任何东西.

我知道破坏(在我的例子中)

state.Stop();
Run Code Online (Sandbox Code Playgroud)

但它仍然在循环中.

它甚至可能吗?

编辑:

对于那些搜索到其他方式的人来说,我就是这样的:

var timer = new System.Timers.Timer(5000);

timer.Elapsed += new System.Timers.ElapsedEventHandler((obj, args) =>
{
    state.Stop();
});

timer.Enabled = true;
Run Code Online (Sandbox Code Playgroud)

c# parallel-processing parallel.foreach

3
推荐指数
1
解决办法
338
查看次数