背景:我有一个功能:
def doWork(symbol: String): Future[Unit]
Run Code Online (Sandbox Code Playgroud)
它会启动一些副作用以获取数据并存储它,并在完成时完成Future.但是,后端基础结构具有使用限制,因此可以并行地生成不超过5个这些请求.我有一个N符号列表,我需要通过:
var symbols = Array("MSFT",...)
Run Code Online (Sandbox Code Playgroud)
但是我想对它们进行排序,使得不超过5个同时执行.鉴于:
val allowableParallelism = 5
Run Code Online (Sandbox Code Playgroud)
我当前的解决方案是(假设我正在使用async/await):
val symbolChunks = symbols.toList.grouped(allowableParallelism).toList
def toThunk(x: List[String]) = () => Future.sequence(x.map(doWork))
val symbolThunks = symbolChunks.map(toThunk)
val done = Promise[Unit]()
def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match {
case Nil => done.success()
case x::xs => x().onComplete(_ => procThunks(xs))
}
procThunks(symbolThunks)
await { done.future }
Run Code Online (Sandbox Code Playgroud)
但是,由于显而易见的原因,我对此并不十分满意.我觉得这应该是可能的折叠,但每次我尝试,我最终热切地创造期货.我还使用了concatMap尝试了一个带有RxScala Observables的版本,但这看起来也有些过分.
有没有更好的方法来实现这一目标?
我在相当高吞吐量的设置中使用Scala Enumeration ValueSet - 创建,测试,联合和交叉约10M /秒/核心.我没想到这会是一个大问题,因为我曾经读过他们得到BitSets支持的地方,但令人惊讶的是,ValueSet.isEmpty在与YourKit的分析会话中出现了热点.
为了验证,我决定尝试使用Java BitSet重新实现我需要的东西,同时尝试保留使用Scala Enumerations的一些类型安全性.(代码审查转移到https://codereview.stackexchange.com/questions/74795/scala-bitset-implemented-with-java-bitset-for-use-in-scala-enumerations-to-repl)好消息是,将我的ValueSets更改为这些BitSet确实减少了25%的运行时间,因此我不知道ValueSet在引擎盖下的确做了什么,但它可以改进......
编辑:查看ValueSet源似乎表明isEmpty肯定是O(N),使用通用SetLike.isEmpty实现. 考虑到ValueSet是用BitSet实现的,这是一个错误吗?
编辑:这是分析器的回溯.这似乎是一种在bitset上实现isEmpty的疯狂方法.

我正在http://www.thalesians.com/archive/public/academic/finance/papers/Zumbach_2000.pdf中对运营商进行新的实施: 编辑:此处的解释更清楚:https : //www.olseninvest.com/客户/pdf/paper/001207-emaOfEma.pdf
简而言之,它是一堆基于指数移动平均值的递归关系的很酷的时间序列运算符,其中ema()运算符的每个应用都采用新值和ema的先前结果。我似乎无法在此堆栈交换上进行乳胶操作,但是无论如何我现在的问题是软件问题。
我在Scala中通过在创建EMA函数的thunk中隐藏了一个变量来实现了这一点。所有这些都可以,但是非常棘手,因为再次调用ema(5)然后再次调用ema(5)自然会导致不同的结果。我想使用State Monads重做所有这些操作,但是我很快就迷失了自己。
例如,我在Haskell中具有以下简化的EMA State monad:
import Control.Monad.State
type EMAState = Double
type Tau = Double
ema :: Tau -> Double -> State EMAState Double
ema tau x = state $ \y ->
let alpha = 1 / tau
mu = exp(-alpha)
mu' = 1 - mu
y' = (mu * y) + (mu' * x)
in (y', y')
Run Code Online (Sandbox Code Playgroud)
我可以在GHCI中轻松测试:
*Main Control.Monad.State> runState (ema 5 10) 0
(1.8126924692201818,1.8126924692201818)
Run Code Online (Sandbox Code Playgroud)
将输入10应用于初始化为0的5周期EMA。这很好,使用forM可以应用多个输入值,等等。现在,下一步是实现“迭代EMA”,即应用的EMA自己N次。
iEMA[n](x) = EMA(iEMA[n-1](x)) …Run Code Online (Sandbox Code Playgroud) 我试图找出istream使用x3 解析的正确方法。较旧的文档指的是multi_pass东西,我仍然可以使用它吗?还是有其他方法来缓冲X3的流,以便它可以回溯?
我有数据存储在磁盘上,数百万条记录的文件,每天一个.我有一个相对有效的反序列化器,可以生成发出记录的Observable,现在足够快(1.5M记录/秒).
我想要的是现在连接这些Observable,以便我可以得到一个多天的跨度不间断的流.当这个简单的工作时我很激动:
val nilObs: Observable[Record] = Observable.empty
val allObs = dates.map(reader.recordsForDate(_)).foldLeft(nilObs)(_ ++ _)
Run Code Online (Sandbox Code Playgroud)
但是,上面的结果是Observable吞吐量很大 - 有200个连续的Observable,我看到50-100k/s,相比我预期的1.5M/s.
我还没有介绍过它,但是看看https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorConcat.java它正在做一个大量的队列工作 - 我想知道我是否在一个队列中创建了N个队列而不是N个Observable?
是否有更有效的方式以这种方式连接Observables?
情况:
事件的流(RxScala),我们使用tumblingBuffer()进行批处理,然后构建完整的调试历史记录.最终我想要所有值的(Seq [T],Seq [T]),所以我创建了以下函数作为foldLeft的累加器:
def tupleConcat[S](a: (Seq[S], Seq[S]), b: (Seq[S], Seq[S])) = (a._1 ++ b._1, a._2 ++ b._2)
Run Code Online (Sandbox Code Playgroud)
现在,这对我来说衬托一堆警钟的看了Runar&保罗在Scala的函数式编程之后,因为这看起来极像两个半群实例的MAP2,但我还是有点停留在如何正确地概括它.到目前为止,我认为它可能看起来像:
def tupleConcatSG[S](a: (S,S), b: (S,S))(implicit s: Semigroup[S]) = (a._1 |+| b._1, a._2 |+| b._2)
Run Code Online (Sandbox Code Playgroud)
(但是我必须从我能收集到的东西中将我的Seq推广到IndexedSeq).
为了进一步推广到任何Applicative,我想我需要一个元组实例,它可能来自Shapeless?还是我错过了一些明显的东西?
编辑:我还要补充一点,我试图避免压缩和解压缩,基于偏见的性能问题,但也许我不应该担心的是......(每个tumblingBuffer的价值(SEQ,SEQ)将是〜15000长,决赛(Seq,Seq)应该是数百万).
我发现自己经常做以下事情:
val adjustedActions = actions.scanLeft((1.0, null: CorpAction)){
case ((runningSplitAdj, _), action) => action match {
case Dividend(date, amount) =>
(runningSplitAdj, Dividend(date, amount * runningSplitAdj))
case s @ Split(date, sharesForOne) =>
((runningSplitAdj * sharesForOne), s)
}
}
.drop(1).map(_._2)
Run Code Online (Sandbox Code Playgroud)
runningSplitAdj在这种情况下,我需要积累,以便纠正动作列表中的股息.在这里,我scan用来维护我需要的状态以纠正动作,但最后,我只需要动作.因此,我需要在状态中使用null作为初始操作,但最后,删除该项并映射所有状态.
是否有更优雅的方式来构建这些? 在RxScala Observables的上下文中,我实际上创建了一个新的运算符(在RxJava邮件列表的一些帮助之后):
implicit class ScanMappingObs[X](val obs: Observable[X]) extends AnyVal {
def scanMap[S,Y](f: (X,S) => (Y,S), s0: S): Observable[Y] = {
val y0: Y = null.asInstanceOf[Y]
// drop(1) because scan also emits initial state
obs.scan((y0, s0)){case ((y, s), x) …Run Code Online (Sandbox Code Playgroud) 如果问题措辞不当,我会道歉,我会尽我所能.
如果我有一系列值的时间,Observable[(U,T)]其中U是一个值,而T是一个类似时间的类型(或者我认为有任何差异),我怎么能写一个自动重置一触式屏障的运算符,这是沉默的abs(u_n - u_reset) < barrier,但是t_n - t_reset如果触摸屏障会吐出,此时它也会重置u_reset = u_n.
也就是说,此运算符接收的第一个值成为基线,并且它不会发出任何结果.此后,它监视流的值,并且只要其中一个超出基线值(高于或低于),它就会发出经过的时间(通过事件的时间戳测量),并重置基线.然后将处理这些时间以形成波动率的高频估计.
作为参考,我试图写一个在http://www.amazon.com/Volatility-Trading-CD-ROM-Wiley/dp/0470181990中概述的波动率估算器,而不是测量标准偏差(在常规均匀时间的偏差) ),你反复测量一些固定障碍量突破障碍所需的时间.
具体来说,这可以使用现有的运营商编写吗?我有点不知道状态将如何被重置,尽管我可能需要制作两个嵌套的运算符,一个是一次性的,另一个是不断创建一次性的...我知道它可以通过写入来完成一个手工,但后来我需要写自己的出版商等.
谢谢!
是否有某个文档描述了各种spirit :: x3规则定义操作如何影响属性兼容性?
我很惊讶:
x3::lexeme[ x3::alpha > *(x3::alnum | x3::char_('_')) ]
Run Code Online (Sandbox Code Playgroud)
无法移动到融合适应的结构中:
struct Name {
std::string value;
};
Run Code Online (Sandbox Code Playgroud)
目前,我摆脱了第一个强制字母字符,但我仍然想表达一个规则,该规则定义名称字符串必须以字母开头.这是其中一种情况,我需要尝试添加eps,直到它工作,或有一个明确的原因,为什么上述不起作用?
如果在某处写下来,我道歉,我找不到它.
scala ×6
boost ×2
boost-spirit ×2
c++ ×2
rx-java ×2
bitset ×1
enumeration ×1
fold ×1
future ×1
haskell ×1
java ×1
monads ×1
observable ×1
recurrence ×1
rx-scala ×1
scalaz ×1
shapeless ×1
state ×1
state-monad ×1
volatility ×1