Fla*_*nix 1 stream elixir ecto
PS:以下情况描述了一个假设场景,我拥有一家向客户销售产品的公司。
我有一个 Ecto 查询太大,我的机器无法处理它。返回数十亿个结果,世界上可能没有足够的 RAM 来处理它。
这里的解决方案(或者我的研究表明)是使用流。流是为可能无限的结果集而创建的,这适合我的用例。
https://hexdocs.pm/ecto/Ecto.Repo.html#c:stream/2
因此,让我们想象一下,我想要删除购买给定商品的所有用户。也许那个东西在他们的国家并不真正合法,而现在我,这个 IT 行业的可怜人,必须解决问题,这样世界才不会崩溃。
天真的方式:
item_id = "123asdasd123"
purchase_ids =
Purchases
|> where([p], p.item_id == ^item_id)
|> select([p], p.id)
|> Repo.all()
Users
|> where([u], u.purchase_id in ^purchase_ids)
|> Repo.delete_all()
Run Code Online (Sandbox Code Playgroud)
这是天真的方法。我称其为天真,因为有两个问题:
purchase_ids查询)purchase_ids可能会有超过 100K 的 id,因此第二个查询(我们删除内容的地方)将会失败,因为它达到了 32K 的 Postgres 参数限制: https: //stackoverflow.com/a/42251312/1337392我能说的是,我们的产品非常容易上瘾,而且价格非常优惠!我们的客户简直无法满足。不知道为什么。没有。我想不出任何理由。一个都没有。
考虑到这些问题,我无法帮助我的客户并发展我的帝国,我的意思是,小家庭拥有的企业。
我确实找到了这个可能的解决方案:
串流方式:
item_id = "123asdasd123"
purchase_ids =
Purchases
|> where([p], p.item_id == ^item_id)
|> select([p], p.id)
stream = Repo.stream(purchase_ids)
Repo.transacion(fn ->
ids = Enum.to_list(stream)
Users
|> where([u], u.purchase_id in ^ids)
|> Repo.delete_all()
end)
Run Code Online (Sandbox Code Playgroud)
但是,我不相信这会起作用:
Enum.to_list所有内容并将其保存到变量中,然后将所有内容再次放入内存中。所以我没有通过使用获得任何优势Repo.stream。ids需要我Repo.delete_all去工作而不爆炸我想这里的一个优点是,这现在是一笔交易,所以要么一切顺利,要么什么都没有。
因此,出现了以下问题:
streams在这种场景下我该如何正确使用呢?ids) 删除项目还是必须手动批处理它们?Repo.delete_all?人们不能直接Repo.delete_all/1用溪流喂食,但Stream.chunk_every/2这里是你的朋友。人们可以像下面这样做(500这是默认值,因此我们\xe2\x80\x99d也:max_rows使用它。)chunk_every/2
Repo.transacion(fn ->\n max_rows = 500\n\n purchase_ids\n |> Repo.stream(max_rows: max_rows)\n |> Stream.chunk_every(max_rows)\n |> Stream.each(fn ids ->\n Users\n |> where([u], u.purchase_id in ^ids)\n |> Repo.delete_all()\n end)\n |> Stream.run()\nend, timeout: :infinity)\nRun Code Online (Sandbox Code Playgroud)\n