1 个事务中的 Ecto 多个流

Fla*_*nix 1 stream elixir ecto

背景

PS:以下情况描述了一个假设场景,我拥有一家向客户销售产品的公司。

我有一个 Ecto 查询太大,我的机器无法处理它。返回数十亿个结果,世界上可能没有足够的 RAM 来处理它。

这里的解决方案(或者我的研究表明)是使用流。流是为可能无限的结果集而创建的,这适合我的用例。

https://hexdocs.pm/ecto/Ecto.Repo.html#c:stream/2

问题

因此,让我们想象一下,我想要删除购买给定商品的所有用户。也许那个东西在他们的国家并不真正合法,而现在我,这个 IT 行业的可怜人,必须解决问题,这样世界才不会崩溃。

天真的方式

item_id = "123asdasd123"

purchase_ids =
      Purchases
      |> where([p], p.item_id == ^item_id)
      |> select([p], p.id)
      |> Repo.all()

Users
    |> where([u], u.purchase_id in ^purchase_ids)
    |> Repo.delete_all()
Run Code Online (Sandbox Code Playgroud)

这是天真的方法。我称其为天真,因为有两个问题:

  • 我们购买的东西太多了,机器的内存会溢出(查看purchase_ids查询)
  • purchase_ids可能会有超过 100K 的 id,因此第二个查询(我们删除内容的地方)将会失败,因为它达到了 32K 的 Postgres 参数限制: https: //stackoverflow.com/a/42251312/1337392

我能说的是,我们的产品非常容易上瘾,而且价格非常优惠!我们的客户简直无法满足。不知道为什么。没有。我想不出任何理由。一个都没有。

考虑到这些问题,我无法帮助我的客户并发展我的帝国,我的意思是,小家庭拥有的企业。

我确实找到了这个可能的解决方案:

串流方式

item_id = "123asdasd123"

purchase_ids =
      Purchases
      |> where([p], p.item_id == ^item_id)
      |> select([p], p.id)

stream = Repo.stream(purchase_ids)

Repo.transacion(fn -> 
  ids = Enum.to_list(stream)

  Users
    |> where([u], u.purchase_id in ^ids)
    |> Repo.delete_all()
end)
Run Code Online (Sandbox Code Playgroud)

问题

但是,我不相信这会起作用:

  • 我正在使用Enum.to_list所有内容并将其保存到变量中,然后将所有内容再次放入内存中。所以我没有通过使用获得任何优势Repo.stream
  • 我还有太多的东西ids需要我Repo.delete_all去工作而不爆炸

我想这里的一个优点是,这现在是一笔交易,所以要么一切顺利,要么什么都没有。

因此,出现了以下问题:

  • streams在这种场景下我该如何正确使用呢?
  • 我可以通过流参数 ( ids) 删除项目还是必须手动批处理它们?
  • 我可以将 id 流式传输到 吗Repo.delete_all

Ale*_*kin 5

人们不能直接Repo.delete_all/1用溪流喂食,但Stream.chunk_every/2这里是你的朋友。人们可以像下面这样做(500这是默认值,因此我们\xe2\x80\x99d也:max_rows使用它。)chunk_every/2

\n
Repo.transacion(fn ->\n  max_rows = 500\n\n  purchase_ids\n  |> Repo.stream(max_rows: max_rows)\n  |> Stream.chunk_every(max_rows)\n  |> Stream.each(fn ids ->\n     Users\n     |> where([u], u.purchase_id in ^ids)\n     |> Repo.delete_all()\n  end)\n  |> Stream.run()\nend, timeout: :infinity)\n
Run Code Online (Sandbox Code Playgroud)\n