Created
November 4, 2021 08:15
-
-
Save albertored/2fbbf304da765716f88336558698d0c2 to your computer and use it in GitHub Desktop.
Revisions
-
albertored created this gist
Nov 4, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,45 @@ defmodule MyApp.Broadway.Heartbeat do use GenServer, restart: :transient alias Broadway.Message def start_link(%Message{} = message) do state = %{ rh: extract_message_receipt_handle(message), id: extract_message_id(message), gid: extract_message_group_id(message), queue: Application.fetch_env!(:my_app, :sqs_in_queue), init_visibility: Application.fetch_env!(:my_app, :sqs_init_visibility), visibility_step: Application.fetch_env!(:my_app, :sqs_visibility_step) } GenServer.start_link(__MODULE__, state, name: {:via, Registry, {MyApp.Broadway.HeartbeatRegistry, state.id}} ) end def init(%{init_visibility: init_visibility, id: id, gid: gid} = init_state) do timer = Process.send_after(self(), :update_visibility, init_visibility * 500) {:ok, Map.put(init_state, :timer, timer)} end def handle_info(:update_visibility, %{rh: rh, visibility_step: step, queue: queue} = state) do timer = Process.send_after(self(), :update_visibility, step * 1_000) {:ok, _} = queue |> ExAws.SQS.change_message_visibility(rh, step * 2) |> ExAws.request() {:noreply, %{state | timer: timer}} end def handle_call(:stop, _from, state), do: {:stop, :normal, :ok, state} defp extract_message_receipt_handle(%Message{metadata: metadata}), do: Map.get(metadata, :receipt_handle) defp extract_message_id(%Message{metadata: metadata}), do: Map.get(metadata, :message_id) defp extract_message_group_id(%Message{metadata: metadata}), do: metadata |> Map.get(:attributes, %{}) |> Map.get("message_group_id", nil) end This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,37 @@ defmodule MyApp.Broadway.HeartbeatSupervisor do use DynamicSupervisor alias Broadway.Message def start_link(args) do DynamicSupervisor.start_link(__MODULE__, args, name: __MODULE__) end @impl true def init(_arg) do DynamicSupervisor.init(strategy: :one_for_one) end def start_heartbeat(message) do spec = %{ id: MyApp.Broadway.Heartbeat, start: {MyApp.Broadway.Heartbeat, :start_link, [message]}, restart: :transient } DynamicSupervisor.start_child(__MODULE__, spec) message end def stop(msg), do: action(msg, :stop) defp action(%Message{metadata: %{message_id: id}}, action), do: action(id, action) defp action(id, action) when is_bitstring(id) do case Registry.lookup(MyApp.Broadway.HeartbeatRegistry, id) do [{pid, _value} | _tail] -> GenServer.call(pid, action) [] -> {:error, :not_found} end end end This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,61 @@ defmodule MyApp.Broadway.SQSClient do alias Broadway.Message alias BroadwaySQS.ExAwsClient alias MyAapp.Broadway.HeartbeatSupervisor @behaviour BroadwaySQS.SQSClient @behaviour Broadway.Acknowledger @max_num_messages_allowed_by_aws 10 defdelegate init(opts), to: ExAwsClient @impl true def receive_messages(demand, opts) do demand |> ExAwsClient.receive_messages(opts) # start heartbeat process to increase visibility |> Enum.map(&HeartbeatSupervisor.start_heartbeat/1) |> Enum.map(&replace_acknowledger/1) end @impl true def ack(ack_ref, successful, failed) do # stop visibility updates Enum.each(successful, &HeartbeatSupervisor.stop/1) Enum.each(failed, &HeartbeatSupervisor.stop/1) # set visibility to fail_visibility value for failed messages failed |> Enum.chunk_every(@max_num_messages_allowed_by_aws) |> Enum.each(fn messages -> release_messages(messages, ack_ref) end) ExAwsClient.ack(ack_ref, successful, failed) end # Internal defp extract_message_receipt(%Message{acknowledger: {_, _, opts}}), do: Map.get(opts, :receipt) defp replace_acknowledger(%Message{acknowledger: {_, ack_ref, opts}} = message) do %Message{message | acknowledger: {__MODULE__, ack_ref, opts}} end defp release_messages([], _ack_ref), do: :ok defp release_messages(messages, ack_ref) do fail_visibility = Application.get_env(:my_app, :sqs_fail_visibility, 0) receipts = Enum.map(messages, fn msg -> msg |> extract_message_receipt() |> Map.put(:visibility_timeout, fail_visibility) end) opts = Broadway.TermStorage.get!(ack_ref) opts.queue_name |> ExAws.SQS.change_message_visibility_batch(receipts) |> ExAws.request(opts.config) end end