我有一个csv_file,其中a.)首先,每行需要转换为xml和b.)第二,转换后的xml将被发送到rails端进行一些数据库写操作.
以下是我的Flow代码.
flow = csv_rows
|> Flow.from_enumerable()
|> Flow.partition
|> Flow.map(&(CSV.generate_xml(&1)))
|> Flow.map(&(CSV.save_to_rails_databse(&1)))
|> Flow.run
Run Code Online (Sandbox Code Playgroud)
Everyting对于小型csv文件工作正常,但是当csv_file非常大(假设为20,000)记录时,则执行第二次操作(即在rails侧写入数据库)试图同时插入两个记录,因为elixir同时向rails侧发送了太多请求,因此数据库达到了峰值限制.
这将是很好处理的一批50事件,并会在min_demand和max_demand将在这种情况下非常有用.
Jos*_*lim 10
您可以使用Flow.map_state/2接收特定状态的整个状态(在您的情况下,因为您正在映射,状态将是该批次中的事件).
你将需要在这里使用三个参数,全部给予from_enumerable:
其他一些注意事项:
当我们研究Flow时,我们能够获得一些Flow课程并将其应用回Elixir.其中一个课程产生了Task.async_stream/3,当您想要在没有reduce阶段的集合上进行映射时,这非常有用,正是您所拥有的:
batch_size = 100
# 8 tasks running at the same time and we don't care about the results order
async_options = [max_concurrency: 8, ordered: false]
csv_rows
|> Stream.chunk(batch_size)
|> Task.async_stream(fn batch ->
batch
|> Enum.map(&CSV.generate_xml/1)
|> CSV.save_to_batch_rails_database()
end, async_options)
|> Stream.run()
Run Code Online (Sandbox Code Playgroud)
我没有测试过代码,但它应该提供足够的指导.它应该与Flow一样快但没有额外的依赖性.