小编ami*_*ayh的帖子

CQRS汇总

我是CQRS/ES世界的新手,我有一个问题.我正在开发一个使用事件采购和CQRS的发票Web应用程序.

我的问题是 - 根据我的理解,进入系统的新命令(比方说ChangeLineItemPrice)应该通过域模型,以便可以将其验证为合法命令(例如,检查此行项目是否实际存在,价格不违反任何商业规则等).如果一切顺利(命令未被拒绝) - 则会创建并存储相应的事件(例如LineItemPriceChanged)

我没有得到的是在尝试应用命令之前如何将此聚合保留在内存中.如果我在系统中有一百万张发票,每次我想申请一个命令时,我应该播放整个历史记录吗?我是否总是在没有任何验证的情况下保存事件,并在构建视图模型/投影时进行验证?

如果我误解了过程的任何部分,我将非常感谢您的反馈.

谢谢你的帮助!

domain-driven-design system-design cqrs event-sourcing

10
推荐指数
1
解决办法
1093
查看次数

猫效应 - 独立效果的平行组合

我想组合多个IO应该并行独立运行的值.

val io1: IO[Int] = ???
val io2: IO[Int] = ???
Run Code Online (Sandbox Code Playgroud)

在我看来,我必须选择:

  1. 使用带有叉形连接图案的cats-effect纤维
    val parallelSum1: IO[Int] = for {
      fiber1 <- io1.start
      fiber2 <- io2.start
      i1 <- fiber1.join
      i2 <- fiber2.join
    } yield i1 + i2
    
    Run Code Online (Sandbox Code Playgroud)
  2. 使用Parallel实例IOparMapN(或它的兄弟姐妹等中的一种parTraverse,parSequence,parTupled等)
    val parallelSum2: IO[Int] = (io1, io2).parMapN(_ + _)
    
    Run Code Online (Sandbox Code Playgroud)

不确定每种方法的优缺点,何时我应该选择其中一种方法.当抽象出效果类型IO(无标签 - 最终样式)时,这变得更加棘手:

def io1[F[_]]: F[Int] = ???
def io2[F[_]]: F[Int] = ???

def parallelSum1[F[_]: Concurrent]: F[Int] = for …
Run Code Online (Sandbox Code Playgroud)

functional-programming scala scala-cats cats-effect

9
推荐指数
1
解决办法
484
查看次数

与无标记最终的类型依赖

看完John De Goes的"FP to the Max"(https://www.youtube.com/watch?v=sxudIMiOo68)后,我想知道在无标签最终模式中编写FP程序的方法.

假设我有一些类型类用于建模副作用的东西(以他Console为例):

trait Console[F[_]] {
  def putStrLn(str: String): F[Unit]
  def getStrLn: F[String]
}
Run Code Online (Sandbox Code Playgroud)

你会如何依赖Console

就像他的视频中所示:

def inputLength[F[_]: Functor: Console]: F[Int] =
  Console[F].getStrLn.map(_.length)
Run Code Online (Sandbox Code Playgroud)

优点:函数签名是干净的,您可以从类型类自动派生中受益

明确地

通过直接将实例传递给函数:

def inputLength[F[_]: Functor](console: Console[F]): F[Int] =
  console.getStrLn.map(_.length)
Run Code Online (Sandbox Code Playgroud)

优点:这允许您根据需要明确地连接您的依赖项,并感觉不那么"神奇"

不确定编写此功能的最佳/最惯用的方式是什么,非常感谢您的意见.

谢谢!

functional-programming scala tagless-final

9
推荐指数
1
解决办法
124
查看次数

MySQL游标基于多列的分页

我有一些表,我想使用基于游标的分页查询,但它需要申请多个列.

让我们看一个使用2列的简化示例 - 我像这样获取第一页:

SELECT column_1, column_2
FROM table_name
ORDER BY column_1, column_2
LIMIT 10
Run Code Online (Sandbox Code Playgroud)

在我得到结果后,我可以根据最后一行获取下一页.让我们说最后一行是column_1 = 5, column_2 = 8.我想做这样的事情:

SELECT column_1, column_2
FROM table_name
WHERE column_1 > 5 AND column_2 > 8
ORDER BY column_1, column_2
LIMIT 10
Run Code Online (Sandbox Code Playgroud)

但这显然是错误的.它会过滤掉一行column_1 = 5, column_2 = 9(由于过滤器打开column_1)或一行column_1 = 6, column_2 = 6(因为过滤器打开column_2)

我可以做这样的事情来避免这个问题:

SELECT column_1, column_2
FROM table_name
WHERE column_1 > 5
OR (column_1 = 5 AND column_2 > 8)
ORDER …
Run Code Online (Sandbox Code Playgroud)

mysql pagination

7
推荐指数
1
解决办法
3411
查看次数

无标签最终效果传播

无标签最终模式让我们可以编写纯函数式程序,这些程序明确说明了它们所需的效果。

然而,扩展这种模式可能会变得具有挑战性。我将尝试用一个例子来证明这一点。想象一个简单的程序,它从数据库读取记录并将其打印到控制台。除了 cats/scalaz 之外,我们还需要一些自定义类型类Database和来组合它们:ConsoleMonad

def main[F[_]: Monad: Console: Database]: F[Unit] =
  read[F].flatMap(Console[F].print)

def read[F[_]: Functor: Database]: F[List[String]] =
  Database[F].read.map(_.map(recordToString))
Run Code Online (Sandbox Code Playgroud)

当我想在内层的函数中添加新的效果时,问题就开始了。例如,我希望我的read函数在未找到记录时记录一条消息

def read[F[_]: Monad: Database: Logger]: F[List[String]] =
  Database[F].read.flatMap {
    case Nil => Logger[F].log("no records found") *> Nil.pure
    case records => records.map(recordToString).pure
  }
Run Code Online (Sandbox Code Playgroud)

但现在,我必须向链上游Logger的所有调用者添加约束。read在这个人为的示例中,它只是main,但想象一下这是一个复杂的现实应用程序的几个层。

我们可以从两个方面来看待这个问题:

  1. 我们可以说,明确我们的效果是一件好事,并且我们确切地知道每一层需要哪些效果
  2. 我们还可以说,这泄漏了实现细节 -main不关心日志记录,它只需要read. 此外,在实际应用中,您会在顶层看到很长的效果链。感觉像是代码味道,但我无法确定我还可以采取什么其他方法。

很想了解您对此的见解。

谢谢。

functional-programming scala software-design tagless-final

5
推荐指数
1
解决办法
244
查看次数

采用Kafka流的事件采购

我正在尝试在Kafka流之上实现一个简单的CQRS /事件源概念证明(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/中所述)

我有4个基本部分:

  1. commands topic,使用聚合ID作为按顺序处理每个聚合命令的键
  2. events主题,发布聚合状态的每个更改(同样,密钥是聚合ID).本主题的保留策略为"永不删除"
  3. 一个KTable,用于减少聚合状态并将其保存到状态存储

    events topic stream ->
    group to a Ktable by aggregate ID ->
    reduce aggregate events to current state ->
    materialize as a state store
    
  4. 命令处理器 - 命令流,左连接聚合状态KTable.对于结果流中的每个条目,使用函数(command, state) => events生成结果事件并将其发布到events主题

问题是 - 有没有办法确保我在州商店中拥有最新版本的聚合?

如果违反业务规则,我想拒绝命令(例如,如果实体被标记为已删除,则修改实体的命令无效).但是如果a DeleteCommand发布后跟ModifyCommand右后一个,则delete命令将生成DeletedEvent,但是当ModifyCommand处理时,来自状态存储的加载状态可能尚未反映,并且将发布冲突事件.

我不介意牺牲命令处理吞吐量,我宁愿得到一致性保证(因为所有内容都按相同的密钥分组,最终应该在同一个分区中)

希望很清楚:)有什么建议吗?

event-sourcing apache-kafka apache-kafka-streams

4
推荐指数
1
解决办法
1100
查看次数

没有实现复制分配的结构在哪里,如何从函数返回?

我读到 Rust 中的内存默认分配在堆栈上,除非通过使用 aBox或其他方法明确告诉编译器使用堆。

我知道所有权在函数调用之间移动,但实际分配的结构内存在哪里?如果它在堆栈上,当函数退出时会发生什么?

#[derive(Debug)]
struct Foo(i32);

#[derive(Debug)]
struct Bar(Foo);

fn foo() -> Foo {
    Foo(42)
}

fn bar() -> Bar {
    let f = foo();
    Bar(f)
}

fn main() {
    let bar = bar();
    println!("{:?}", bar);
}
Run Code Online (Sandbox Code Playgroud)

例如,在第 12 行,Foobar()函数的堆栈帧中分配了一个结构体。当bar()退出时,堆栈被退绕和所述存储器被回收。既然struct没有实现Copy,内存就没有被复制,那它去哪儿了呢?

我认为这里有一个我不明白的基本思想。

memory-management rust

0
推荐指数
1
解决办法
149
查看次数