使用Elixir进行去抖动

ark*_*diy 2 elixir gen-server

我从MQ进入Elixir消费者的事件流.

在消费者中我需要:

  1. 按ID和ID聚合事件
  2. 如果3分钟内没有该ID的新数据,则向下游发送ID的聚合数据.

在我的情况下,数据集并不大.每天可能只有几百个ID和几千个更新.

有什么办法可以使用GenServer魔术来解决这个问题吗?

谢谢!

Dog*_*ert 8

我这样做:

每当有新事件发生时:

  • 如果它是具有该id的第一个事件,则创建一个使用Process.send_after/3超时为3分钟的计时器ref ,并将事件和计时器存储在该状态中.

  • 如果它不是具有该id的第一个事件,则取消存储的计时器ref Process.cancel_timer/1,创建上一步骤中提到的新计时器,并将新计时器与连接旧事件的新事件一起存储.

handle_info由计时器触发的情况下,将该id的事件推送到下游,并从该状态中删除该条目.

这是上面的简单实现:

defmodule DebouncedEcho do
  @timeout 1000

  use GenServer

  def start_link do
    GenServer.start_link __MODULE__, []
  end

  def init(_) do
    {:ok, %{}}
  end

  def handle_cast({:store, id, event}, state) do
    case state[id] do
      nil ->
        timer = Process.send_after(self, {:timer, id}, @timeout)
        state = Map.put(state, id, %{events: [event], timer: timer})
        {:noreply, state}
      %{events: events, timer: timer} ->
        Process.cancel_timer(timer)
        timer = Process.send_after(self, {:timer, id}, @timeout)
        state = Map.put(state, id, %{events: [event | events], timer: timer})
        {:noreply, state}
    end
  end

  def handle_info({:timer, id}, state) do
    %{events: events} = state[id]
    IO.inspect {:flush, id, events}
    state = Map.delete(state, id)
    {:noreply, state}
  end
end
Run Code Online (Sandbox Code Playgroud)

测试:

{:ok, server} = DebouncedEcho.start_link
GenServer.cast server, {:store, 1, :foo}
GenServer.cast server, {:store, 1, :bar}
GenServer.cast server, {:store, 2, :foo}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :bar}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :baz}
:timer.sleep(500)
GenServer.cast server, {:store, 1, :baz}
:timer.sleep(2000)
Run Code Online (Sandbox Code Playgroud)

输出:

{:flush, 1, [:bar, :foo]}
{:flush, 2, [:baz, :bar, :foo]}
{:flush, 1, [:baz]}
Run Code Online (Sandbox Code Playgroud)