Skip to content

Instantly share code, notes, and snippets.

@albertored
Created November 4, 2021 08:15
Show Gist options
  • Select an option

  • Save albertored/2fbbf304da765716f88336558698d0c2 to your computer and use it in GitHub Desktop.

Select an option

Save albertored/2fbbf304da765716f88336558698d0c2 to your computer and use it in GitHub Desktop.

Revisions

  1. albertored created this gist Nov 4, 2021.
    45 changes: 45 additions & 0 deletions broadway_heartbeat.ex
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    defmodule MyApp.Broadway.Heartbeat do
    use GenServer, restart: :transient

    alias Broadway.Message

    def start_link(%Message{} = message) do
    state = %{
    rh: extract_message_receipt_handle(message),
    id: extract_message_id(message),
    gid: extract_message_group_id(message),
    queue: Application.fetch_env!(:my_app, :sqs_in_queue),
    init_visibility: Application.fetch_env!(:my_app, :sqs_init_visibility),
    visibility_step: Application.fetch_env!(:my_app, :sqs_visibility_step)
    }

    GenServer.start_link(__MODULE__, state,
    name: {:via, Registry, {MyApp.Broadway.HeartbeatRegistry, state.id}}
    )
    end

    def init(%{init_visibility: init_visibility, id: id, gid: gid} = init_state) do
    timer = Process.send_after(self(), :update_visibility, init_visibility * 500)
    {:ok, Map.put(init_state, :timer, timer)}
    end

    def handle_info(:update_visibility, %{rh: rh, visibility_step: step, queue: queue} = state) do
    timer = Process.send_after(self(), :update_visibility, step * 1_000)

    {:ok, _} =
    queue
    |> ExAws.SQS.change_message_visibility(rh, step * 2)
    |> ExAws.request()

    {:noreply, %{state | timer: timer}}
    end

    def handle_call(:stop, _from, state), do: {:stop, :normal, :ok, state}

    defp extract_message_receipt_handle(%Message{metadata: metadata}), do: Map.get(metadata, :receipt_handle)

    defp extract_message_id(%Message{metadata: metadata}), do: Map.get(metadata, :message_id)

    defp extract_message_group_id(%Message{metadata: metadata}),
    do: metadata |> Map.get(:attributes, %{}) |> Map.get("message_group_id", nil)
    end
    37 changes: 37 additions & 0 deletions broadway_heartbeat_supervisor.ex
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,37 @@
    defmodule MyApp.Broadway.HeartbeatSupervisor do
    use DynamicSupervisor

    alias Broadway.Message

    def start_link(args) do
    DynamicSupervisor.start_link(__MODULE__, args, name: __MODULE__)
    end

    @impl true
    def init(_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
    end

    def start_heartbeat(message) do
    spec = %{
    id: MyApp.Broadway.Heartbeat,
    start: {MyApp.Broadway.Heartbeat, :start_link, [message]},
    restart: :transient
    }

    DynamicSupervisor.start_child(__MODULE__, spec)

    message
    end

    def stop(msg), do: action(msg, :stop)

    defp action(%Message{metadata: %{message_id: id}}, action), do: action(id, action)

    defp action(id, action) when is_bitstring(id) do
    case Registry.lookup(MyApp.Broadway.HeartbeatRegistry, id) do
    [{pid, _value} | _tail] -> GenServer.call(pid, action)
    [] -> {:error, :not_found}
    end
    end
    end
    61 changes: 61 additions & 0 deletions broadway_sqs_client.ex
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    defmodule MyApp.Broadway.SQSClient do
    alias Broadway.Message
    alias BroadwaySQS.ExAwsClient

    alias MyAapp.Broadway.HeartbeatSupervisor

    @behaviour BroadwaySQS.SQSClient
    @behaviour Broadway.Acknowledger

    @max_num_messages_allowed_by_aws 10

    defdelegate init(opts), to: ExAwsClient

    @impl true
    def receive_messages(demand, opts) do
    demand
    |> ExAwsClient.receive_messages(opts)
    # start heartbeat process to increase visibility
    |> Enum.map(&HeartbeatSupervisor.start_heartbeat/1)
    |> Enum.map(&replace_acknowledger/1)
    end

    @impl true
    def ack(ack_ref, successful, failed) do
    # stop visibility updates
    Enum.each(successful, &HeartbeatSupervisor.stop/1)
    Enum.each(failed, &HeartbeatSupervisor.stop/1)

    # set visibility to fail_visibility value for failed messages
    failed
    |> Enum.chunk_every(@max_num_messages_allowed_by_aws)
    |> Enum.each(fn messages -> release_messages(messages, ack_ref) end)

    ExAwsClient.ack(ack_ref, successful, failed)
    end

    # Internal

    defp extract_message_receipt(%Message{acknowledger: {_, _, opts}}), do: Map.get(opts, :receipt)

    defp replace_acknowledger(%Message{acknowledger: {_, ack_ref, opts}} = message) do
    %Message{message | acknowledger: {__MODULE__, ack_ref, opts}}
    end

    defp release_messages([], _ack_ref), do: :ok

    defp release_messages(messages, ack_ref) do
    fail_visibility = Application.get_env(:my_app, :sqs_fail_visibility, 0)

    receipts =
    Enum.map(messages, fn msg ->
    msg |> extract_message_receipt() |> Map.put(:visibility_timeout, fail_visibility)
    end)

    opts = Broadway.TermStorage.get!(ack_ref)

    opts.queue_name
    |> ExAws.SQS.change_message_visibility_batch(receipts)
    |> ExAws.request(opts.config)
    end
    end