Skip to content

Instantly share code, notes, and snippets.

@albertored
Created November 4, 2021 08:15
Show Gist options
  • Select an option

  • Save albertored/2fbbf304da765716f88336558698d0c2 to your computer and use it in GitHub Desktop.

Select an option

Save albertored/2fbbf304da765716f88336558698d0c2 to your computer and use it in GitHub Desktop.
Dynamic visibility timeout for Broadway SQS Client
defmodule MyApp.Broadway.Heartbeat do
use GenServer, restart: :transient
alias Broadway.Message
def start_link(%Message{} = message) do
state = %{
rh: extract_message_receipt_handle(message),
id: extract_message_id(message),
gid: extract_message_group_id(message),
queue: Application.fetch_env!(:my_app, :sqs_in_queue),
init_visibility: Application.fetch_env!(:my_app, :sqs_init_visibility),
visibility_step: Application.fetch_env!(:my_app, :sqs_visibility_step)
}
GenServer.start_link(__MODULE__, state,
name: {:via, Registry, {MyApp.Broadway.HeartbeatRegistry, state.id}}
)
end
def init(%{init_visibility: init_visibility, id: id, gid: gid} = init_state) do
timer = Process.send_after(self(), :update_visibility, init_visibility * 500)
{:ok, Map.put(init_state, :timer, timer)}
end
def handle_info(:update_visibility, %{rh: rh, visibility_step: step, queue: queue} = state) do
timer = Process.send_after(self(), :update_visibility, step * 1_000)
{:ok, _} =
queue
|> ExAws.SQS.change_message_visibility(rh, step * 2)
|> ExAws.request()
{:noreply, %{state | timer: timer}}
end
def handle_call(:stop, _from, state), do: {:stop, :normal, :ok, state}
defp extract_message_receipt_handle(%Message{metadata: metadata}), do: Map.get(metadata, :receipt_handle)
defp extract_message_id(%Message{metadata: metadata}), do: Map.get(metadata, :message_id)
defp extract_message_group_id(%Message{metadata: metadata}),
do: metadata |> Map.get(:attributes, %{}) |> Map.get("message_group_id", nil)
end
defmodule MyApp.Broadway.HeartbeatSupervisor do
use DynamicSupervisor
alias Broadway.Message
def start_link(args) do
DynamicSupervisor.start_link(__MODULE__, args, name: __MODULE__)
end
@impl true
def init(_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start_heartbeat(message) do
spec = %{
id: MyApp.Broadway.Heartbeat,
start: {MyApp.Broadway.Heartbeat, :start_link, [message]},
restart: :transient
}
DynamicSupervisor.start_child(__MODULE__, spec)
message
end
def stop(msg), do: action(msg, :stop)
defp action(%Message{metadata: %{message_id: id}}, action), do: action(id, action)
defp action(id, action) when is_bitstring(id) do
case Registry.lookup(MyApp.Broadway.HeartbeatRegistry, id) do
[{pid, _value} | _tail] -> GenServer.call(pid, action)
[] -> {:error, :not_found}
end
end
end
defmodule MyApp.Broadway.SQSClient do
alias Broadway.Message
alias BroadwaySQS.ExAwsClient
alias MyAapp.Broadway.HeartbeatSupervisor
@behaviour BroadwaySQS.SQSClient
@behaviour Broadway.Acknowledger
@max_num_messages_allowed_by_aws 10
defdelegate init(opts), to: ExAwsClient
@impl true
def receive_messages(demand, opts) do
demand
|> ExAwsClient.receive_messages(opts)
# start heartbeat process to increase visibility
|> Enum.map(&HeartbeatSupervisor.start_heartbeat/1)
|> Enum.map(&replace_acknowledger/1)
end
@impl true
def ack(ack_ref, successful, failed) do
# stop visibility updates
Enum.each(successful, &HeartbeatSupervisor.stop/1)
Enum.each(failed, &HeartbeatSupervisor.stop/1)
# set visibility to fail_visibility value for failed messages
failed
|> Enum.chunk_every(@max_num_messages_allowed_by_aws)
|> Enum.each(fn messages -> release_messages(messages, ack_ref) end)
ExAwsClient.ack(ack_ref, successful, failed)
end
# Internal
defp extract_message_receipt(%Message{acknowledger: {_, _, opts}}), do: Map.get(opts, :receipt)
defp replace_acknowledger(%Message{acknowledger: {_, ack_ref, opts}} = message) do
%Message{message | acknowledger: {__MODULE__, ack_ref, opts}}
end
defp release_messages([], _ack_ref), do: :ok
defp release_messages(messages, ack_ref) do
fail_visibility = Application.get_env(:my_app, :sqs_fail_visibility, 0)
receipts =
Enum.map(messages, fn msg ->
msg |> extract_message_receipt() |> Map.put(:visibility_timeout, fail_visibility)
end)
opts = Broadway.TermStorage.get!(ack_ref)
opts.queue_name
|> ExAws.SQS.change_message_visibility_batch(receipts)
|> ExAws.request(opts.config)
end
end
@albertored
Copy link
Copy Markdown
Author

This is a follow up for this issue dashbitco/broadway_sqs#18

The code is taken from a legacy project and uses old versions of broadway (0.3.0) and broadway_sqs (0.2.0) so it may not work or there can be a better way to do it with newer versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment