标签: parallel-extensions

大规模并行螺纹读取的Azure表存储性能

简短版本:我们能否以多线程方式从数十个或数百个表分区中读取数据,以提高数量级的性能?

长版本:我们正在开发一个在Azure表存储中存储数百万行的系统.我们将数据分区为小分区,每个分区包含大约500条记录,代表单位的一天数据.

由于Azure没有"总和"功能,为了提取一年的数据,我们要么必须使用一些预缓存,要么自己在Azure Web或辅助角色中对数据求和.

假设如下: - 读取分区不会影响另一个分区的性能 - 读取分区有一个基于网络速度和服务器检索的瓶颈

然后我们可以猜测,如果我们想要快速总结大量数据(1年365个分区),我们可以使用大规模并行算法,它几乎可以完美地扩展到线程数.例如,我们可以使用具有50个以上线程的.NET并行扩展,并获得巨大的性能提升.

我们正在努力设置一些实验,但我想看看之前是否已经完成.由于.NET端基本上处于等待高延迟操作的空闲状态,因此这对于多线程来说似乎是完美的.

.net azure parallel-extensions

8
推荐指数
1
解决办法
2206
查看次数

Parallel.For步长

有没有人知道是否有任何重载允许我在Parallel.For循环中指定步长?c#或VB.Net中的样本会很棒.

谢谢,贡萨洛

.net c# vb.net parallel-extensions

8
推荐指数
1
解决办法
4112
查看次数

使用Parallel.ForEach的Azure TableQuery线程安全性

我有一些基本的Azure表,我一直在串行查询:

var query = new TableQuery<DynamicTableEntity>()
  .Where(TableQuery.GenerateFilterCondition("PartitionKey",
    QueryComparisons.Equal, myPartitionKey));

foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) {
  // Process entity here.
}
Run Code Online (Sandbox Code Playgroud)

为了加快速度,我将其并行化:

Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => {
  // Process entity here in a thread-safe manner.

  // Edited to add: Details of the loop body below:

  // This is the essence of the fixed loop body:
  lock (myLock) {
    DataRow myRow = myDataTable.NewRow();
    // [Add entity data to myRow.]
    myDataTable.Rows.Add(myRow);
  }

  // Old code (apparently not thread-safe, though NewRow() is supposed to …
Run Code Online (Sandbox Code Playgroud)

c# azure parallel-extensions azure-table-storage

8
推荐指数
1
解决办法
706
查看次数

在Parallel.ForEach中使用哈希表?

我有一个Parallel.ForEach循环在主体内部运行密集操作.

该操作可以使用Hashtable来存储值,并且可以重用于其他连续的循环项.我在密集操作完成后添加到Hashtable,下一个循环项可以在Hashtable中查找并​​重用该对象,而不是再次运行密集操作.

但是,因为我使用的是Parallel.ForEach,所以存在一个不安全的问题,导致Hashtable.Add和ContainsKey(key)调用不同步,因为它们可能并行运行.引入锁可能会导致性能问题.

这是示例代码:

Hashtable myTable = new Hashtable;
Parallel.ForEach(items, (item, loopState) =>
{
    // If exists in myTable use it, else add to hashtable
    if(myTable.ContainsKey(item.Key))
    {
       myObj = myTable[item.Key];
    }
    else
    {
       myObj = SomeIntensiveOperation();
       myTable.Add(item.Key, myObj); // Issue is here : breaks with exc during runtime
    }
    // Do something with myObj
    // some code here
}
Run Code Online (Sandbox Code Playgroud)

TPL库中必须有一些API,属性设置,可以处理这种情况.在那儿?

.net c# parallel-extensions task-parallel-library

7
推荐指数
1
解决办法
7703
查看次数

Parallel.For中断

假设你有一个1000个随机整数的数组,你需要循环它以找到数字68,例如.

在四核CPU上使用新的Parallel.For可以大大提高速度,使每个核心只能工作250个数组项.

问题是:当满足以下条件时,是否可以中断Parallel.For循环?

if (integerArray[i] == 68)

   break;
Run Code Online (Sandbox Code Playgroud)

谢谢.

.net c# parallel-processing parallel-extensions

7
推荐指数
1
解决办法
778
查看次数

使用ThreadStatic属性的Parallel Extensions.它会泄漏内存吗?

我正在大量使用Parallel Extensions,我刚刚遇到一种情况,即使用线程本地存储可能是合理的,允许工作线程重用对象.因此,我查看了ThreadStatic属性,该属性将静态字段/变量标记为每个线程具有唯一值.

在我看来,使用PE与ThreadStatic属性是不明智的,而不保证PE重用线程.也就是说,如果在某种程度上创建和销毁线程,那么变量(以及它们指向的对象)是否会在线程本地存储中保留一段不确定的时间,从而导致内存泄漏?或者线程存储可能与线程相关联并在线程处理时被丢弃?但是,您仍然可能在池中存在长期存在的线程,并且会从线程所使用的各种代码中累积线程本地存储.

是否有更好的方法来获取PE的线程本地存储?

谢谢.

.net thread-static parallel-extensions

6
推荐指数
1
解决办法
1124
查看次数

使用Parallel.ForEach在最小值中选择最小值

我是C#Parallel.ForEach,和.NET的新手.我想并行化涉及数千个位置的搜索.对于每个位置,我计算大圆距离.这是我想要传播到不同核心的计算.我的问题是,如果我只有一个线程局部变量,我该怎么做呢,就像在这个MSDN TPL示例中一样?对于结果,我看了看Interlocked,看到它的选项Add,CompareExchange,Decrement,Exchange,IncrementRead,但我不只是增加,递增,递减,或测试是否相等.我希望通过并行运行的多个线程返回对象,这个线程总体上最短距离.我的直觉说这应该很容易,我应该能够创建一个包裹Location一个距离的小物体,但是如何从每个线程中捕获最佳答案然后选择它们之间的最短距离?这是非并行版本:

Location findClosestLocation(Location myLocation, List<Location> allLocations)
{
  double closest = double.MaxValue;
  Location closestLoc = null;
  foreach (Location aLoc in allLocations)
  {
    if (aLoc != myLocation)
    {
      double d = greatCircle(myLocation, aLoc);
      if (d < closest)
      {
        closest = d;
        closestLoc = aLoc;
      }
    }
  }
  return closestLoc;
}
Run Code Online (Sandbox Code Playgroud)

我确实看到一个似乎提供了很好建议的DDJ博客文章,但我想知道这是否是最好的建议.我看到作者循环遍历数组,并想知道是否没有更多功能的方法来做到这一点.在我将使用的功能世界中map …

.net c# parallel-processing parallel-extensions c#-4.0

6
推荐指数
1
解决办法
1690
查看次数

数据的线程安全缓冲区,用于批量插入受控大小

我有一个生成必须保存到数据库的数据的模拟。

ParallelLoopResult res = Parallel.For(0, 1000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    SaveDataToDatabase(cds);

});
Run Code Online (Sandbox Code Playgroud)

模拟会生成大量数据,所以先生成然后保存到数据库(最多1GB的数据)是不切实际的,并且将其一一保存到数据库也是没有意义的(交易太小不实用)。我想将它们作为受控大小的批量插入插入到数据库中(比如一次提交 100)。

但是,我认为我对并行计算的了解并没有那么理论化。我想出了这个(正如你所看到的,这是非常有缺陷的):

DataBuffer buffer = new DataBuffer(...);

ParallelLoopResult res = Parallel.For(0, 10000000, options, (r, state) =>
{
    ComplexDataSet cds = GenerateData(r);

    buffer.SaveDataToBuffer(cds, i == r - 1);

});

public class DataBuffer
{
    int count = 0;
    int limit = 100

    object _locker = new object();

    ConcurrentQueue<ConcurrentBag<ComplexDataSet>> ComplexDataBagQueue{ get; set; }

    public void SaveDataToBuffer(ComplexDataSet data, bool isfinalcycle)
    {
            lock (_locker)
            {
                if(count >= …
Run Code Online (Sandbox Code Playgroud)

.net c# parallel-processing parallel-extensions task-parallel-library

6
推荐指数
1
解决办法
2821
查看次数

TransactionScope不使用Parallel Extensions?

如果我做以下事情:

 Using scope = New TransactionScope()
        entries.Content.ReadAs(Of IList(Of WebMaint)).AsParallel.ForAll(Sub(entry)
                                                                            _repos.Update(entry)
                                                                        End Sub)
        scope.Complete()
    End Using
Run Code Online (Sandbox Code Playgroud)

TransactionScope不起作用.如果我在scope.complete上放置了一个断点,则没有事务处于活动状态,并且更新已经完成.

如果我改为:

Using scope = New TransactionScope()
            entries.Content.ReadAs(Of IList(Of WebMaint)).ToList().ForEach(Sub(entry)
                                                                               _repos.Update(entry)
                                                                           End Sub)
            scope.Complete()
End Using
Run Code Online (Sandbox Code Playgroud)

一切都按预期工作.任何人都知道为什么并行版本无法正常工作?

.net transactionscope parallel-extensions

6
推荐指数
1
解决办法
1774
查看次数

如何在C#中获取线程ID

public bool HasItemsFromPropertySet(InfoItemPropertySet propertySet, CompositeInfoItem itemRemoved)
    {
        var itemAndSubItems = new InfoItemCollection();
        if (itemRemoved != null)
        {
            itemAndSubItems.Add(itemRemoved);
            //foreach (InfoItem item in itemRemoved.AllDescendants)
            itemAndSubItems.AddRange(itemRemoved.AllDescendants);
        }
        return AllItems.AsParallel().Any(item => item.PropertySet == propertySet && !itemAndSubItems.Contains(item));
    }
Run Code Online (Sandbox Code Playgroud)


在我的代码上面我使用AsParallel().Any()如何获得由AsParellel.Any()生成的线程的线程ID ...

c# parallel-extensions task-parallel-library

6
推荐指数
1
解决办法
1万
查看次数