F#高频实时流数据的不可变数据结构

And*_* P. 11 f# stream data-structures f#-3.0 f#-data

我们正处于f#项目的开端,该项目涉及流数据的实时和历史分析.数据包含在ac#对象中(见下文),并作为标准.net事件的一部分发送.实时地,我们通常接收的事件的数量可以从每个仪器每秒大约800个事件的不到1 /秒到更高的变化很大,因此可以是非常突发的.典型的一天可能会累积500万行/每个元素

C#事件的数据结构的通用版本如下所示:

public enum MyType { type0 = 0, type1 = 1}

public class dataObj
{
    public int myInt= 0;
    public double myDouble;
    public string myString;
    public DateTime myDataTime;
    public MyType type;
    public object myObj = null;

}
Run Code Online (Sandbox Code Playgroud)

我们计划以两种方式在f#中使用这个数据结构:

  1. 使用有监督和无监督机器学习(CRF,聚类模型等)的历史分析
  2. 使用上述模型对数据流进行实时分类

随着我们添加更多事件,数据结构需要能够增长.这排除了array<t>因为它不允许调整大小,尽管它可以用于历史分析.数据结构还需要能够快速访问最近的数据,理想情况下需要能够跳回到数据x点.这排除了Lists<T>因为线性查找时间,并且因为没有随机访问元素,只是"仅向前"遍历.

根据这篇文章,Set<T>可能是个不错的选择......

>"... Vanilla Set <'a>做的工作不够充分.我更喜欢在'List'上设置'Set',所以你总是可以O(lg n)访问最大和最小的项目,允许你通过插入日期/时间订购您的设置,以便有效地访问最新和最旧的项目..."

编辑:尹祝的回应让我更清楚地知道我在问什么.我已经编辑了帖子的其余部分以反映这一点.此外,通过引入历史分析的要求,这个问题的先前版本变得混乱.我省略了它们.

以下是实时流程步骤的细分:

  1. 收到实时事件
  2. 此事件放在数据结构中.这是我们试图确定的数据结构.它应该是一个Set<T>还是其他结构?
  3. 为了产生特征,要么提取元素或以某种方式迭代元素的子集.这将是数据结构的最后n行/元素(即,最后1000个事件或10,000个事件)或最后x秒/分钟中的所有元素(即最后10分钟内的所有事件).理想情况下,我们需要一种允许我们有效地执行此操作的结构.特别是,允许​​随机访问第n个元素而不迭代所有其他元素的数据结构是有价值的.
  4. 生成模型的特征并将其发送到模型以进行评估.
  5. 我们可能会修剪旧数据的数据结构以提高性能.

所以问题是什么是用于存储我们将用于生成功能的实时流事件的最佳数据结构.

Jac*_*Fox 11

你应该考虑 FSharpx.Collections.Vector.Vector <T>将为您提供类似于数组的功能,包括索引O(log32(n))查找和更新,它在O(1)的吐出距离内,以及在序列末尾添加新元素.Vector的另一个实现可以在Solid Vector中从F#中使用.有很好的文档记录,有些函数在大规模上的运行速度提高了4倍(元素数> 10K).两种实现都可以很好地执行,甚至可能超过1M元素.


And*_* P. 10

在他的回答中,Jack Fox建议使用FSharpx.Collections Vector<'T>Vector<'t>Greg Rosenbaum 的Solid (https://github.com/GregRos/Solid).我想我可以通过提供如何启动和运行它们的说明来回馈社区.

使用FSharpx.Collections.Vector <'T>

这个过程非常简单:

  1. 使用Project Manager控制台或Manager Nuget Packages for Solution下载FSharpx.Core nuget软件包.两者都可以在Visual Studio - >工具 - >库管理器中找到.
  2. 如果您在F#脚本文件中使用它添加#r "FSharpx.Core.dll".您可能需要使用完整路径.

用法:

open FSharpx.Collections

let ListOfTuples = [(1,true,3.0);(2,false,1.5)] 
let vector = ListOfTuples |> Vector.ofSeq

printfn "Last %A" vector.Last
printfn "Unconj %A" vector.Unconj
printfn "Item(0) %A" (vector.[0])
printfn "Item(1) %A" (vector.[1])
printfn "TryInitial %A" dataAsVector.TryInitial
printfn "TryUnconj %A" dataAsVector.Last
Run Code Online (Sandbox Code Playgroud)

使用Solid.Vector <'T>

获得使用Solid的设置Vector<'t>有点复杂.但是Solid版本具有更多方便的功能,正如杰克指出的那样,它具有许多性能优势.它还有很多有用的文档.

  1. 您需要从https://github.com/GregRos/Solid下载visual studio解决方案
  2. 下载后,您需要构建它,因为没有准备好使用预先构建的DLL.
  3. 如果你像我一样,你可能会遇到许多缺少依赖关系的问题,这些依赖关系无法构建解决方案.就我而言,它们都与nuit测试框架有关(我使用不同的框架).只需下载/添加每个依赖项,直到构建解决方案.
  4. 一旦完成并构建了解决方案,您将在Solid/Solid/bin文件夹中拥有一个闪亮的新Solid.dll.这是我出错的地方.这是核心DLL,仅适用于C#.如果你只包含对Solid.dll的引用,你将能够在f#中创建一个向量<'T>,但从那时起就会发生时髦的事情.
  5. 要使用F#这个数据结构,您需要同时引用了Solid.dllSolid.FSharp.dll它在发现\Solid\SolidFS\obj\Debug\文件夹中.您只需要一个公开声明 - >open Solid

以下是一些显示F#脚本文件用法的代码:

#r "Solid.dll"
#r "Solid.FSharp.dll" // don't forget this reference

open Solid

let ListOfTuples2 = [(1,true,3.0);(2,false,1.5)] 
let SolidVector = ListOfTuples2 |> Vector.ofSeq

printfn "%A" SolidVector.Last
printfn "%A" SolidVector.First
printfn "%A" (SolidVector.[0])
printfn "%A" (SolidVector.[1])
printfn "Count %A" SolidVector.Count

let test2 = vector { for i in {0 .. 100} -> i }
Run Code Online (Sandbox Code Playgroud)


Yin*_*Zhu 5

假设您dataObj包含一个唯一的ID字段,那么任何设置的数据结构都适合您的工作.不可变数据结构主要用于功能样式代码或持久性.如果你不需要这两个,你可以使用HashSet<T>SortedSet<T>在.NET集合库.

某些流特定优化可能是有用的,例如,Queue<T>为流中的最新数据对象保持固定大小,并将较旧对象存储在更重的权重集中.在转换到这种混合数据结构解决方案之前,我建议进行基准测试.

编辑:

在更仔细地阅读您的要求之后,我发现您想要的是具有用户可访问索引或后向枚举器的队列.在此数据结构下,您的特征提取操作(例如平均值/总和等)花费O(n).如果要在O(log n)中执行某些操作,可以使用更高级的数据结构,例如间隔树或跳过列表.但是,您需要自己实现这些数据结构,因为您需要将元信息存储在集合API后面的树节点中.


Jon*_*rop 5

此事件放在数据结构中.这是我们试图确定的数据结构.它应该是Set,Queue还是其他结构?

没有更多信息很难说.

如果您的数据以递增的顺序进入时间戳(即它们永远不会出现故障),那么您可以使用某种队列或可扩展数组.

如果您的数据无法正常运行并且需要重新排序,那么您需要优先级队列或索引集合.

每秒大约800个事件

这些对插入率的性能要求非常高.

为了产生特征,要么提取元素或以某种方式迭代元素的子集.这将是数据结构的最后n行/元素(即,最后1000个事件或10,000个事件)或最后x秒/分钟中的所有元素(即最后10分钟内的所有事件).理想情况下,我们需要一种允许我们有效地执行此操作的结构.特别是,允许​​随机访问第n个元素而不迭代所有其他元素的数据结构是有价值的.

如果您只想要开头附近的元素,为什么要随机访问?你真的想通过索引进行随机访问吗?或者你真的想通过其他一些关键时间访问吗?

从你所说的我建议使用一个普通的F#Map键入索引来维护一个MailboxProcessor可以附加一个新事件并检索一个允许所有事件被索引Map的对象,即包装一个提供自己的Item属性和实现的对象的IEnumerable<_>.在我的机器上,简单的解决方案需要50行代码,每秒可以处理大约500,000个事件.

  • 你能发贴你的例子吗?我想看到它,因为我不完全确定我会听从你的建议 (2认同)