Created
November 4, 2021 08:15
-
-
Save albertored/2fbbf304da765716f88336558698d0c2 to your computer and use it in GitHub Desktop.
Dynamic visibility timeout for Broadway SQS Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule MyApp.Broadway.Heartbeat do | |
| use GenServer, restart: :transient | |
| alias Broadway.Message | |
| def start_link(%Message{} = message) do | |
| state = %{ | |
| rh: extract_message_receipt_handle(message), | |
| id: extract_message_id(message), | |
| gid: extract_message_group_id(message), | |
| queue: Application.fetch_env!(:my_app, :sqs_in_queue), | |
| init_visibility: Application.fetch_env!(:my_app, :sqs_init_visibility), | |
| visibility_step: Application.fetch_env!(:my_app, :sqs_visibility_step) | |
| } | |
| GenServer.start_link(__MODULE__, state, | |
| name: {:via, Registry, {MyApp.Broadway.HeartbeatRegistry, state.id}} | |
| ) | |
| end | |
| def init(%{init_visibility: init_visibility, id: id, gid: gid} = init_state) do | |
| timer = Process.send_after(self(), :update_visibility, init_visibility * 500) | |
| {:ok, Map.put(init_state, :timer, timer)} | |
| end | |
| def handle_info(:update_visibility, %{rh: rh, visibility_step: step, queue: queue} = state) do | |
| timer = Process.send_after(self(), :update_visibility, step * 1_000) | |
| {:ok, _} = | |
| queue | |
| |> ExAws.SQS.change_message_visibility(rh, step * 2) | |
| |> ExAws.request() | |
| {:noreply, %{state | timer: timer}} | |
| end | |
| def handle_call(:stop, _from, state), do: {:stop, :normal, :ok, state} | |
| defp extract_message_receipt_handle(%Message{metadata: metadata}), do: Map.get(metadata, :receipt_handle) | |
| defp extract_message_id(%Message{metadata: metadata}), do: Map.get(metadata, :message_id) | |
| defp extract_message_group_id(%Message{metadata: metadata}), | |
| do: metadata |> Map.get(:attributes, %{}) |> Map.get("message_group_id", nil) | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule MyApp.Broadway.HeartbeatSupervisor do | |
| use DynamicSupervisor | |
| alias Broadway.Message | |
| def start_link(args) do | |
| DynamicSupervisor.start_link(__MODULE__, args, name: __MODULE__) | |
| end | |
| @impl true | |
| def init(_arg) do | |
| DynamicSupervisor.init(strategy: :one_for_one) | |
| end | |
| def start_heartbeat(message) do | |
| spec = %{ | |
| id: MyApp.Broadway.Heartbeat, | |
| start: {MyApp.Broadway.Heartbeat, :start_link, [message]}, | |
| restart: :transient | |
| } | |
| DynamicSupervisor.start_child(__MODULE__, spec) | |
| message | |
| end | |
| def stop(msg), do: action(msg, :stop) | |
| defp action(%Message{metadata: %{message_id: id}}, action), do: action(id, action) | |
| defp action(id, action) when is_bitstring(id) do | |
| case Registry.lookup(MyApp.Broadway.HeartbeatRegistry, id) do | |
| [{pid, _value} | _tail] -> GenServer.call(pid, action) | |
| [] -> {:error, :not_found} | |
| end | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule MyApp.Broadway.SQSClient do | |
| alias Broadway.Message | |
| alias BroadwaySQS.ExAwsClient | |
| alias MyAapp.Broadway.HeartbeatSupervisor | |
| @behaviour BroadwaySQS.SQSClient | |
| @behaviour Broadway.Acknowledger | |
| @max_num_messages_allowed_by_aws 10 | |
| defdelegate init(opts), to: ExAwsClient | |
| @impl true | |
| def receive_messages(demand, opts) do | |
| demand | |
| |> ExAwsClient.receive_messages(opts) | |
| # start heartbeat process to increase visibility | |
| |> Enum.map(&HeartbeatSupervisor.start_heartbeat/1) | |
| |> Enum.map(&replace_acknowledger/1) | |
| end | |
| @impl true | |
| def ack(ack_ref, successful, failed) do | |
| # stop visibility updates | |
| Enum.each(successful, &HeartbeatSupervisor.stop/1) | |
| Enum.each(failed, &HeartbeatSupervisor.stop/1) | |
| # set visibility to fail_visibility value for failed messages | |
| failed | |
| |> Enum.chunk_every(@max_num_messages_allowed_by_aws) | |
| |> Enum.each(fn messages -> release_messages(messages, ack_ref) end) | |
| ExAwsClient.ack(ack_ref, successful, failed) | |
| end | |
| # Internal | |
| defp extract_message_receipt(%Message{acknowledger: {_, _, opts}}), do: Map.get(opts, :receipt) | |
| defp replace_acknowledger(%Message{acknowledger: {_, ack_ref, opts}} = message) do | |
| %Message{message | acknowledger: {__MODULE__, ack_ref, opts}} | |
| end | |
| defp release_messages([], _ack_ref), do: :ok | |
| defp release_messages(messages, ack_ref) do | |
| fail_visibility = Application.get_env(:my_app, :sqs_fail_visibility, 0) | |
| receipts = | |
| Enum.map(messages, fn msg -> | |
| msg |> extract_message_receipt() |> Map.put(:visibility_timeout, fail_visibility) | |
| end) | |
| opts = Broadway.TermStorage.get!(ack_ref) | |
| opts.queue_name | |
| |> ExAws.SQS.change_message_visibility_batch(receipts) | |
| |> ExAws.request(opts.config) | |
| end | |
| end |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a follow up for this issue dashbitco/broadway_sqs#18
The code is taken from a legacy project and uses old versions of broadway (0.3.0) and broadway_sqs (0.2.0) so it may not work or there can be a better way to do it with newer versions.