从稀疏定义列表中挑选无模式下值的算法

zor*_*ank 6 wolfram-mathematica

我有以下问题.

我正在开发一个随机模拟器,它随机对系统的配置进行采样,并存储在某些时间点访问每个配置的次数的统计数据.大致代码就像这样工作

f[_Integer][{_Integer..}] :=0
...
someplace later in the code, e.g.,
index = get index;
c = get random configuration (i.e. a tuple of integers, say a pair {n1, n2}); 
f[index][c] = f[index][c] + 1;
which tags that configuration c has occurred once more in the simulation at time instance index.
Run Code Online (Sandbox Code Playgroud)

一旦代码完成,就会有一个f的定义列表,看起来像这样(我手工输入它只是为了强调最重要的部分)

?f
f[1][{1, 2}] = 112
f[1][{3, 4}] = 114
f[2][{1, 6}] = 216
f[2][{2, 7}] = 227
...
f[index][someconfiguration] = some value
...
f[_Integer][{_Integer..}] :=0
Run Code Online (Sandbox Code Playgroud)

请注意,首先出现的无模式定义可能相当稀疏.也无法知道将挑选哪些值和配置.

问题是有效地提取所需索引的值,例如问题

result = ExtractConfigurationsAndOccurences[f, 2] 
Run Code Online (Sandbox Code Playgroud)

这应该给出一个结构列表

result = {list1, list2}
Run Code Online (Sandbox Code Playgroud)

哪里

list1 = {{1, 6}, {2, 7}} (* the list of configurations that occurred during the simulation*)
list2 = {216, 227} (* how many times each of them occurred *)
Run Code Online (Sandbox Code Playgroud)

问题是ExtractConfigurationsAndOccurences应该非常快.我能想到的唯一解决方案是使用SubValues [f](给出完整列表)并使用Casesstatement 过滤它.我意识到应该不惜一切代价避免这个过程,因为会有指数级的测试配置(定义),这会大大减慢代码的速度.

Mathematica有一种自然的方法可以快速完成吗?

我希望Mathematica将f [2]视为具有许多下降值的单个头但是使用DownValues [f [2]]什么都没有.同样使用SubValues [f [2]]会导致错误.

Leo*_*rin 7

这完全重写了我以前的答案.原来,在我以前的尝试,我忽略了基于打包数组和稀疏阵列的组合更简单的方法,那就是更快,更多的内存-比所有以前的方法(至少在样品尺寸范围内有效,我测试它),而只是最小化改变原有SubValues的方法.由于问到了最有效的方法问题,我将从答案中删除其他问题(假设它们相当复杂并占用了大量空间.那些希望看到它们的人可以检查过去的修订版本回答).

原始SubValues方法

我们首先介绍一个函数来为我们生成配置的测试样本.这里是:

Clear[generateConfigurations];
generateConfigurations[maxIndex_Integer, maxConfX_Integer, maxConfY_Integer, 
  nconfs_Integer] :=
Transpose[{
  RandomInteger[{1, maxIndex}, nconfs],
  Transpose[{
     RandomInteger[{1, maxConfX}, nconfs],
     RandomInteger[{1, maxConfY}, nconfs]
  }]}]; 
Run Code Online (Sandbox Code Playgroud)

我们可以生成一个小样本来说明:

In[3]:= sample  = generateConfigurations[2,2,2,10]
Out[3]= {{2,{2,1}},{2,{1,1}},{1,{2,1}},{1,{1,2}},{1,{1,2}},
          {1,{2,1}},{2,{1,2}},{2,{2,2}},{1,{2,2}},{1,{2,1}}}
Run Code Online (Sandbox Code Playgroud)

我们这里只有2个索引和配置,其中"x"和"y"数字仅从1到2变化--10个这样的配置.

以下函数将帮助我们模拟配置的频率累积,因为我们SubValues为重复出现的计数器增加基于计数器:

Clear[testAccumulate];
testAccumulate[ff_Symbol, data_] :=
  Module[{},
   ClearAll[ff];
   ff[_][_] = 0;
   Do[
     doSomeStuff;
     ff[#1][#2]++ & @@ elem;
     doSomeMoreStaff;
   , {elem, data}]];
Run Code Online (Sandbox Code Playgroud)

doSomeStuffdoSomeMoreStaff符号在这里代表一些代码,可能妨碍或遵循计数代码.该data参数应该是由生成的表单的列表generateConfigurations.例如:

In[6]:= 
testAccumulate[ff,sample];
SubValues[ff]

Out[7]= {HoldPattern[ff[1][{1,2}]]:>2,HoldPattern[ff[1][{2,1}]]:>3,
   HoldPattern[ff[1][{2,2}]]:>1,HoldPattern[ff[2][{1,1}]]:>1,
   HoldPattern[ff[2][{1,2}]]:>1,HoldPattern[ff[2][{2,1}]]:>1,
   HoldPattern[ff[2][{2,2}]]:>1,HoldPattern[ff[_][_]]:>0}
Run Code Online (Sandbox Code Playgroud)

以下函数将从以下列表中提取结果数据(索引,配置及其频率)SubValues:

Clear[getResultingData];
getResultingData[f_Symbol] :=
   Transpose[{#[[All, 1, 1, 0, 1]], #[[All, 1, 1, 1]], #[[All, 2]]}] &@
        Most@SubValues[f, Sort -> False];
Run Code Online (Sandbox Code Playgroud)

例如:

In[10]:= result = getResultingData[ff]
Out[10]= {{2,{2,1},1},{2,{1,1},1},{1,{2,1},3},{1,{1,2},2},{2,{1,2},1},
{2,{2,2},1},{1,{2,2},1}}
Run Code Online (Sandbox Code Playgroud)

为了完成数据处理周期,这里有一个简单的函数来提取固定索引的数据,基于Select:

Clear[getResultsForFixedIndex];
getResultsForFixedIndex[data_, index_] := 
  If[# === {}, {}, Transpose[#]] &[
    Select[data, First@# == index &][[All, {2, 3}]]];
Run Code Online (Sandbox Code Playgroud)

对于我们的测试示例,

In[13]:= getResultsForFixedIndex[result,1]
Out[13]= {{{2,1},{1,2},{2,2}},{3,2,1}}
Run Code Online (Sandbox Code Playgroud)

这可能与@zorank尝试的代码相似.

基于压缩数组和稀疏数组的更快解决方案

正如@zorank所指出的,对于具有更多索引和配置的更大样本,这变得缓慢.我们现在将生成一个大样本来说明(注意!这需要大约4-5 Gb的RAM,因此如果超出可用RAM,您可能希望减少配置数量):

In[14]:= 
largeSample = generateConfigurations[20,500,500,5000000];
testAccumulate[ff,largeSample];//Timing

Out[15]= {31.89,Null}
Run Code Online (Sandbox Code Playgroud)

我们现在将从以下内容SubValues中提取完整数据ff:

In[16]:= (largeres = getResultingData[ff]); // Timing
Out[16]= {10.844, Null}
Run Code Online (Sandbox Code Playgroud)

这需要一些时间,但只需要做一次.但是当我们开始提取固定索引的数据时,我们发现它很慢:

In[24]:= getResultsForFixedIndex[largeres,10]//Short//Timing
Out[24]= {2.687,{{{196,26},{53,36},{360,43},{104,144},<<157674>>,{31,305},{240,291},
 {256,38},{352,469}},{<<1>>}}}
Run Code Online (Sandbox Code Playgroud)

我们将在这里使用它来加速它的主要思想是打包单个列表largeres,索引,组合和频率.虽然无法打包完整列表,但这些部分可以单独执行:

In[18]:= Timing[
   subIndicesPacked = Developer`ToPackedArray[largeres[[All,1]]];
   subCombsPacked =  Developer`ToPackedArray[largeres[[All,2]]];
   subFreqsPacked =  Developer`ToPackedArray[largeres[[All,3]]];
]
Out[18]= {1.672,Null}
Run Code Online (Sandbox Code Playgroud)

这也需要一些时间,但它又是一次性操作.

然后,将使用以下函数更有效地提取固定索引的结果:

Clear[extractPositionFromSparseArray];
extractPositionFromSparseArray[HoldPattern[SparseArray[u___]]] := {u}[[4, 2, 2]]

Clear[getCombinationsAndFrequenciesForIndex];
getCombinationsAndFrequenciesForIndex[packedIndices_, packedCombs_, 
    packedFreqs_, index_Integer] :=
With[{positions = 
         extractPositionFromSparseArray[
               SparseArray[1 - Unitize[packedIndices - index]]]},
  {Extract[packedCombs, positions],Extract[packedFreqs, positions]}];
Run Code Online (Sandbox Code Playgroud)

现在,我们有:

In[25]:=  
getCombinationsAndFrequenciesForIndex[subIndicesPacked,subCombsPacked,subFreqsPacked,10]
  //Short//Timing

Out[25]= {0.094,{{{196,26},{53,36},{360,43},{104,144},<<157674>>,{31,305},{240,291},
{256,38},{352,469}},{<<1>>}}}
Run Code Online (Sandbox Code Playgroud)

我们通过天真的Select方法获得了30倍的加速.

关于复杂性的一些注释

请注意,第二种解决方案更快,因为它使用优化的数据结构,但其复杂性与Select基于 - 的方法相同,即所有索引的唯一组合总列表的线性.因此,从理论上讲,先前讨论的基于嵌套哈希表等的解决方案可能是渐近更好的.问题是,在实践中,我们可能很久就会遇到内存限制.对于1000万配置样本,上述代码仍然比我之前发布的最快解决方案快2-3倍.

编辑

以下修改:

Clear[getCombinationsAndFrequenciesForIndex];
getCombinationsAndFrequenciesForIndex[packedIndices_, packedCombs_, 
    packedFreqs_, index_Integer] :=
 With[{positions =  
          extractPositionFromSparseArray[
             SparseArray[Unitize[packedIndices - index], Automatic, 1]]},
    {Extract[packedCombs, positions], Extract[packedFreqs, positions]}];
Run Code Online (Sandbox Code Playgroud)

使代码仍然快两倍.此外,对于更稀疏的索引(例如,使用类似参数调用样本生成函数generateConfigurations[2000, 500, 500, 5000000]),相对于Select基于函数的加速大约是100倍.


Sjo*_*ies 5

我可能在这里使用SparseArrays(参见下面的更新),但是如果你坚持使用函数和*值来存储和检索值,那么一种方法是将第一部分(f [2]等)替换为符号你在飞行中创建像:

Table[Symbol["f" <> IntegerString[i, 10, 3]], {i, 11}]
(* ==> {f001, f002, f003, f004, f005, f006, f007, f008, f009, f010, f011} *)

Symbol["f" <> IntegerString[56, 10, 3]]
(* ==> f056 *)

Symbol["f" <> IntegerString[56, 10, 3]][{3, 4}] = 12;
Symbol["f" <> IntegerString[56, 10, 3]][{23, 18}] = 12;

Symbol["f" <> IntegerString[56, 10, 3]] // Evaluate // DownValues
(* ==> {HoldPattern[f056[{3, 4}]] :> 12, HoldPattern[f056[{23, 18}]] :> 12} *)

f056 // DownValues
(* ==> {HoldPattern[f056[{3, 4}]] :> 12, HoldPattern[f056[{23, 18}]] :> 12} *)
Run Code Online (Sandbox Code Playgroud)

我个人更喜欢Leonid的解决方案,因为它更优雅但是YMMV.

更新

在OP的请求中,关于使用SparseArrays:
Large SparseArrays占用标准嵌套列表大小的一小部分.我们可以使f成为稀疏数组的大型(100,000个)稀疏数组:

f = SparseArray[{_} -> 0, 100000];
f // ByteCount
(* ==> 672 *)

(* initialize f with sparse arrays, takes a few seconds with f this large *)
Do[  f[[i]] = SparseArray[{_} -> 0, {100, 110}], {i,100000}] // Timing//First
(* ==> 18.923 *)

(* this takes about 2.5% of the memory that a normal array would take: *)
f // ByteCount
(* ==>  108000040 *)

ConstantArray[0, {100000, 100, 100}] // ByteCount
(* ==> 4000000176 *)

(* counting phase *)
f[[1]][[1, 2]]++;
f[[1]][[1, 2]]++;
f[[1]][[42, 64]]++;
f[[2]][[100, 11]]++;

(* reporting phase *)
f[[1]] // ArrayRules
f[[2]] // ArrayRules
f // ArrayRules

(* 
 ==>{{1, 2} -> 2, {42, 64} -> 1, {_, _} -> 0}
 ==>{{100, 11} -> 1, {_, _} -> 0}
 ==>{{1, 1, 2} -> 2, {1, 42, 64} -> 1, {2, 100, 11} ->  1, {_, _, _} -> 0}
*)
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,ArrayRules制作一个包含贡献和计数的好列表.这可以分别对每个f [i]或整个束(最后一行)进行.