如何使用elixir流批量处理事件

Cha*_*har 4 elixir

我有一个csv_file,其中a.)首先,每行需要转换为xml和b.)第二,转换后的xml将被发送到rails端进行一些数据库写操作.

以下是我的Flow代码.

flow = csv_rows
 |> Flow.from_enumerable()
 |> Flow.partition
 |> Flow.map(&(CSV.generate_xml(&1)))
 |> Flow.map(&(CSV.save_to_rails_databse(&1)))
 |> Flow.run
Run Code Online (Sandbox Code Playgroud)

Everyting对于小型csv文件工作正常,但是当csv_file非常大(假设为20,000)记录时,则执行第二次操作(即在rails侧写入数据库)试图同时插入两个记录,因为elixir同时向rails侧发送了太多请求,因此数据库达到了峰值限制.

这将是很好处理的一批50事件,并会在min_demandmax_demand将在这种情况下非常有用.

Jos*_*lim 10

您可以使用Flow.map_state/2接收特定状态的整个状态(在您的情况下,因为您正在映射,状态将是该批次中的事件).

你将需要在这里使用三个参数,全部给予from_enumerable:

  • min_demand:这将是批量大小
  • max_demand:阶段之间流动的最大行数
  • 阶段:处理数据的并发阶段数.在您的情况下,同时处理多少批次

其他一些注意事项:

  • 您不需要分区,因为您没有进行任何分组
  • 考虑使用允许CSV作为流使用的NimbleCSV - 如果CSV太大,这有助于内存使用
  • 在这个例子中你可能根本不需要Flow,Task.asycn_stream/3就足够了

当我们研究Flow时,我们能够获得一些Flow课程并将其应用回Elixir.其中一个课程产生了Task.async_stream/3,当您想要在没有reduce阶段的集合上进行映射时,这非常有用,正是您所拥有的:

batch_size = 100

# 8 tasks running at the same time and we don't care about the results order
async_options = [max_concurrency: 8, ordered: false]

csv_rows
|> Stream.chunk(batch_size)
|> Task.async_stream(fn batch -> 
  batch
  |> Enum.map(&CSV.generate_xml/1)
  |> CSV.save_to_batch_rails_database()
end, async_options)
|> Stream.run()
Run Code Online (Sandbox Code Playgroud)

我没有测试过代码,但它应该提供足够的指导.它应该与Flow一样快但没有额外的依赖性.