Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save lukebakken/ee1d50ae268010a3e8fa17f7ce614f53 to your computer and use it in GitHub Desktop.

Select an option

Save lukebakken/ee1d50ae268010a3e8fa17f7ce614f53 to your computer and use it in GitHub Desktop.

Review: cloudamqp/rabbitmq-delayed-message-exchange#2 - Migrate from Mnesia to Leveled

Scope: PR head 3878a85 (24 commits, +1377 / -259). Focus: correctness and design.

TL;DR

The layering is defensible and follows the RabbitMQ precedent for Mnesia-to-Khepri backends (recent-history, JMS, consistent-hash exchanges). Single-bucket timestamp-prefixed keys are a reasonable choice. But the Leveled backend has two critical correctness bugs: storage is never actually freed after delivery, and messages_delayed/1 is wrong for any exchange that has ever delivered a message. There are also several design and robustness issues around the gen_server / migration handoff and a macOS-hostile Makefile. The migration path has a documented "at-most-one extra delivery" tradeoff that is defensible but not exercised by the tests. The PR should not merge as-is; the two critical bugs are blockers. A follow-up topic branch has already landed fixes for several other items in this review; see the section below.

Addressed on a follow-up topic branch

The following items from this review have been addressed in a separate topic branch (from-mnesia-to-leveled-pr-review) with commits on top of 3878a85:

  • Critical bug #3 (delete_index/1 no-op on the leveled backend) is fixed.
  • The duplicate init_per_group(mnesia_to_khepri, _) clause in test/plugin_SUITE.erl is removed; the retained clause is the one with {tcp_ports_base, 21200}.
  • The clear_data_in_khepri/1 dead export and body are removed from rabbit_delayed_message_m2k_converter.
  • The sed -i based patch-leveled-app Makefile target is replaced with a portable erl -noshell invocation.

Every other item in the review, including critical bugs #1 and #2, is still present on the topic branch and remains unaddressed.

Critical correctness bugs

1. Delivered messages are never removed from Leveled (src/rabbit_delayed_message_leveled.erl)

handle_info({timeout, _TimerRef, {deliver, Key}}, State) ->
    case get_many(Key) of
        []          -> delete_index(Key);
        Deliveries  -> _ = route(Deliveries, State),
                       delete(Key),
                       delete_index(Key)
    end,

get_many/2 iterates ets:first(?INDEX_TABLE) and deletes each index entry whose DelayTS matches the fired timer. When it returns, the ETS table no longer contains any entry with the fired DelayTS. Then delete(DelayTS) runs:

delete(DelayTS) ->
    ...
    case ets:first(?INDEX_TABLE) of
        '$end_of_table' -> ok;
        {FirstDelay, Key} = IndexEntry when FirstDelay =:= DelayTS ->
            ets:delete(?INDEX_TABLE, IndexEntry),
            leveled_bookie:book_delete(?BOOKIE, ?BUCKET, Key, []);
        _ -> ok
    end.

ets:first/1 now returns the NEXT earliest entry, whose DelayTS' is strictly greater than the argument. The guard FirstDelay =:= DelayTS fails, the _ clause runs, and no book_delete is issued for any of the entries that were just delivered. Consequence:

  • The Leveled bucket grows monotonically; every delivered message stays on disk forever.
  • init_index/0 rebuilds the ETS index from Leveled on every startup, so after a restart the delivered-long-ago messages are re-queued and re-delivered.
  • The leveled_bookie:book_put/5 call inside store_delay/3 discards its return value, so the pause back-pressure signal from Leveled never reaches the caller. Not a bug on its own, but it means the store has no brake as it grows.

The fix is to delete via the entries get_many already collected, not by re-reading ETS. Something like:

get_many(DelayTS) ->
    IndexEntries = pop_index_entries(DelayTS, []),
    Results = [case leveled_bookie:book_get(?BOOKIE, ?BUCKET, K) of
                   {ok, V} ->
                       _ = leveled_bookie:book_delete(?BOOKIE, ?BUCKET, K, []),
                       {ok, V};
                   not_found -> not_found
               end || {_, K} <- IndexEntries],
    [Entry || {ok, V} <- Results, Entry <- [binary_to_term(V)]].

and then delete/1 and delete_index/1 become no-ops (or are removed entirely from the leveled backend and the dispatcher).

2. messages_delayed/1 only increments (src/rabbit_delayed_message_leveled.erl)

store_delay/3 calls increase_counter/1. Nothing calls a corresponding decrease_counter/1. There is no such function. The counter equals total writes since the last full fold, not pending deliveries. Only init_counters/0 (run once at setup/0) produces a correct value, by folding over every Leveled object.

For the Mnesia backend, messages_delayed/1 uses mnesia:dirty_select at call time and is always accurate. The leveled value is exposed to users via rabbit_exchange_type_delayed_message:info/2, which the broker surfaces via rabbit_exchange:info/1 on the HTTP management API and any tool that fetches exchange info (including rabbitmqctl list_exchanges name messages_delayed). So this is user-visible.

Note this combines adversely with bug #1: if the storage leak is fixed but the counter is not, the counter is now also inflated. And vice versa: fixing the counter but not the storage leak means the counter also stays wrong (since the values are still there).

The symmetric fix is to decrement inside the delete path. Alternatively, drop atomics and compute the per-exchange count by folding ETS at query time; with an in-memory index this is cheap.

3. delete_index/1 is a no-op on the leveled backend

Addressed on topic branch (commit 2ce3df4).

delete_index(DeliveryTS) ->
    case ets:whereis(?INDEX_TABLE) of
        undefined -> ok;
        _ -> ets:delete(?INDEX_TABLE, DeliveryTS)
    end.

ETS keys are tuples {DelayTS, LeveledKey}, not bare integers. ets:delete(Tab, DeliveryTS) never matches. Normally this is masked because get_many/2 has already removed the entries; it matters only on the empty-deliveries branch in handle_info (if an entry somehow made it into the index without a corresponding Leveled object, which shouldn't happen if storage and index stay in sync, but is exactly the path this branch exists to recover from). As a result the plugin cannot self-heal an index-vs-store divergence on the leveled backend, even though it can on the Mnesia one.

Design concerns

Leveled data directory sits inside the Khepri store

Path = filename:join([rabbit_khepri:dir(), "rabbit_delayed_message", "leveled"]),

rabbit_khepri:dir() = filename:join(rabbit_mnesia:dir(), atom_to_list(?STORE_ID)) where ?STORE_ID is rabbitmq_metadata. So the per-node delayed-message Leveled store sits at $DATA_DIR/rabbitmq_metadata/rabbit_delayed_message/leveled/. Any operator-level Khepri reset, or documentation that says "safe to remove the Khepri dir after...", also wipes delayed messages. This path is unrelated to Khepri data and probably should live under rabbit_mnesia:dir() directly (e.g., $DATA_DIR/rabbit_delayed_message/leveled/), independent of which metadata store is in use.

Bookie ownership during the Mnesia-to-Leveled migration

leveled_bookie:book_start/1 uses gen_server:start_link/3, so the bookie is linked to whichever process opens it. Two places open bookies at the same root path:

  1. rabbit_delayed_message_m2k_converter:init_copy_to_khepri/3, running in the migration worker process.
  2. rabbit_delayed_message_leveled:setup/0, running in the delayed-message gen_server.

The hand-off relies on setup/0 calling catch leveled_bookie:book_close(?BOOKIE) first. Two fragilities here:

  • If the migration worker crashes during copy_to_khepri/3, the bookie dies with it (link), and the khepri_db feature flag never transitions to enabled. The gen_server is already inside handle_cast(await_khepri_and_setup, ...), blocked in rabbit_feature_flags:is_enabled(khepri_db). When the transition is abandoned, is_enabled/1 returns false, maybe_switch_to_leveled/1 takes the warning-only branch, and the bookie persistent_term entry still points to the dead pid. Journal recovery eventually restores the data on the next clean setup/0, but the timing and idempotency across a supervisor-restarted gen_server are untested. Worth adding a test that SIGKILLs the migration mid-copy.
  • catch leveled_bookie:book_close(?BOOKIE) where ?BOOKIE can be undefined (fresh boot, no persistent_term set) or a dead pid (after migration converter closed it, or after disable_plugin/0 left the persistent_term in place) is a catch-all that relies on catch absorbing whatever gen_server:call/3 raises. It works, but it hides the distinction between "nothing to close" and "stale pid left over". An explicit case ?BOOKIE of undefined -> ok; Pid -> catch leveled_bookie:book_close(Pid) end with a persistent_term:erase at the end of disable_plugin/0 is clearer and gives room to log when the close unexpectedly fails.

maybe_switch_to_leveled/1 blocks the gen_server indefinitely on a stuck feature flag

maybe_switch_to_leveled(State = #state{timer = CurrTimer}) ->
    case rabbit_feature_flags:is_enabled(khepri_db) of

is_enabled/1 defaults to blocking mode, which in turn calls rabbit_ff_registry_factory:acquire_state_change_lock(). If the migration is stuck or the khepri_db FF transition never resolves, the delayed-message gen_server is unable to process {delay_message, ...} calls while the handle_cast(await_khepri_and_setup, ...) handler runs, because calls and casts share the mailbox. Since delay_message/3 uses gen_server:call(..., infinity), publishers' publishes block indefinitely rather than erroring out. There's no timeout or cancellation path on the gen_server side. Using non_blocking and a short retry loop would preserve liveness.

copy_to_khepri/3 handles pause by sleeping in the single migration worker

case leveled_bookie:book_put(Bookie, ?BUCKET, Key, ..., []) of
    ok    -> ok;
    pause -> int_migration_pause()   %% timer:sleep(1000)
end,

Per the mnesia_to_khepri_converter behaviour contract, "all callbacks are run in the same process." A 1-second sleep therefore blocks the entire table-copy pipeline. If Leveled returns pause frequently during a large migration (which is exactly when back-pressure exists), migration throughput collapses to ~1 record/s. A better response is to apply back-pressure to the Mnesia checkpoint reader (e.g., slow the bookkeeping side) or to treat pause as advisory: continue immediately but with jittered short waits.

get_many/1 issues one book_get per entry

For a timer that fires on N due messages at the same timestamp, the code does N individual leveled_bookie:book_get calls, each a full gen_server:call through the bookie. For bursts this stalls the delayed-message gen_server (which is itself a gen_server:call back from delay_message/3 publishers). An earlier commit used book_objectfold with a range of <<DelayTS:64/big, 0:128>> to <<DelayTS:64/big, 0xFF*16>>, which issues a single gen_server:call to set up the snapshot and then folds via a snapshot owned by the calling process. Replacing N individual point-lookup round-trips with one snapshot-based range fold is a measurable improvement for any burst.

clear_data_in_khepri/1 is dead code

Addressed on topic branch (commit e11e90d).

clear_data_in_khepri(Table) -> ...

The mnesia_to_khepri_converter behaviour does not declare a clear_data_in_khepri/1 callback. The RabbitMQ-level rabbit_db_m2k_converter intermediate (which this module does NOT use) sometimes calls the converter's own private clear_data_in_khepri, but the PR directly implements mnesia_to_khepri_converter. Remove the export and the function.

Unresolved TODO comments in shipping code

Four TODO comments in the PR mark unresolved questions:

  • rabbit_delayed_message.erl line 70: startup behaviour after the removal of go/0.
  • rabbit_delayed_message.erl line 184: an avoidable double get_first_delay/0 call in internal_delay_message/4.
  • rabbit_delayed_message.erl line 266: how #state.stats_state gets refreshed across backends.
  • rabbit_delayed_message_leveled.erl line 186: "oof, having to transform each term aint good" on the per-exchange counter fold.

Either resolve them or open tracking issues before merge.

Minor: counter atomics on repeated setup/0

init_counters/0 folds over all objects in Leveled and, for every exchange with at least one pending delivery, creates a fresh atomic via atomics:new/1 and installs it with persistent_term:put/2, replacing any previously-stored atomic for that exchange. The overwritten atomic becomes unreferenced and its NIF resource can be reclaimed, so this is not a memory leak on its own. But persistent_term:put triggers a global literal-area rebuild; doing it once per exchange on every setup/0 is more disruptive than necessary. Check-then-reuse (keep the existing atomic if present, just overwrite its value with atomics:put/3) would be tidier.

Minor: disable_plugin/0 leaves persistent_term state

disable_plugin() ->
    catch ets:delete(?INDEX_TABLE),
    leveled_bookie:book_close(?BOOKIE).

Neither the bookie persistent_term entry nor the per-exchange counter entries are cleared. On re-enable without a VM restart, persistent_term:get({?MODULE, bookie}) still returns the now-dead pid. setup/0's catch leveled_bookie:book_close(?BOOKIE) then raises a noproc exit inside gen_server:call, which catch silently absorbs (no log output). So this is not user-visible noise, but there is a latent footgun: a new process could in principle claim the old pid and receive a stray close message. Clear the persistent_term entries on disable.

Migration correctness

The migration design is sound apart from the items above. One thing worth calling out positively:

KeySuffix = crypto:hash(md5, term_to_binary({TS, Exchange, Ref})),
Key       = <<TS:64/big, KeySuffix/binary>>,

A deterministic Leveled key prevents duplicate inserts on a retried migration - good. The comment on delete_from_khepri/3 openly says the converter ignores concurrent Mnesia deletions during the copy window and that this can cause at-most-one redundant delivery. That tradeoff is defensible given the alternatives (two-phase synchronisation with an always-progressing delivery timer is hard), but it deserves a test or at least an item in the plugin documentation.

Test-suite gaps

  • Duplicate init_per_group(mnesia_to_khepri, Config) clause. The second clause is unreachable (Erlang takes the first); compiler will warn. The second clause also differs from the first (it lacks {tcp_ports_base, 21200}), suggesting an incomplete rebase. Remove one, and decide intentionally which settings to keep. (Addressed on topic branch, commit c42c1aa: the second clause is removed; the retained clause is the one with {tcp_ports_base, 21200}.)
  • mnesia_to_khepri_migration/1 is shallow. It publishes 3 messages with a 30-second delay, flips khepri_db, and consumes. It does not:
    • assert messages_delayed drops to zero after consumption (would expose bug #2);
    • publish additional messages DURING the migration (would exercise the delete_from_khepri/3 at-most-once acknowledgement);
    • restart the node between migration and consumption (would exercise the converter-opened bookie then setup/0 takeover);
    • verify that a second, post-migration publish works correctly (would exercise get_first_delay on the new backend).
  • No leveled-equivalent of no_message_for_index. The removed test exercised an index-vs-store divergence recovery path. The leveled backend has a symmetric failure mode (ETS populated by init_index/0, but book_get returns not_found for a key that was corrupted or deleted externally) and the get_many empty-deliveries branch is its recovery path. A test that forcibly book_deletes one object between publish and consume would exercise this and would probably fail today because of bug #3.
  • benchmark_SUITE adds value and deliberately excludes itself from make ct by setting CT_SUITES = plugin. That's the right call. It still calls rabbit_delayed_message_mnesia:table_name/0 and index_table_name/0 to clear the Mnesia tables between load-size runs; keep those exports.

Build system

patch-leveled-app is non-portable

Addressed on topic branch (commit c2c980a): the two sed -i calls are replaced with a single erl -noshell -eval invocation that reads the .app file as an Erlang term, removes lz4/zstd from the applications list via set-difference, and writes the file back only when the list actually changes.

patch-leveled-app:
    $(verbose) if [ -f $(DEPS_DIR)/leveled/ebin/leveled.app ]; then \
        sed -i 's/,zstd//' $(DEPS_DIR)/leveled/ebin/leveled.app; \
        sed -i 's/,lz4//' $(DEPS_DIR)/leveled/ebin/leveled.app; \
    fi

GNU sed -i takes the in-place flag alone; BSD/macOS sed requires -i ''. This target will fail on macOS CI and on dev machines. Use sed -i.bak 's/,zstd//' ... && rm -f $(...).bak, or perl -i -pe ..., or better: edit via erl -noshell -eval that reads/writes the .app term.

Rationale is sound but deserves clearer sign-posting

OTP 28's stdlib zstd module clashes with OpenRiak's NIF; RabbitMQ 4.2.0 ships an incompatible lz4. Leveled's only runtime users of those modules (leveled_codec:serialise_object/3 and deserialise_object/4) are never reached when {compression_method, none} is passed to book_start/1, which the PR does. So stripping them from leveled.app prevents OTP from trying to start apps that don't exist, and is safe. The reason is non-obvious. A comment above the DEPS line explains part of this; the Makefile target's comment should also note that the NIF libraries are never loaded in this build because {compression_method, none} short-circuits them in leveled_codec. A builder who changes setup/0 to use lz4 later will find the system starts failing and it won't be obvious why.

Positives worth preserving

  • The dispatch pattern in rabbit_delayed_message.erl (calls through rabbit_khepri:handle_fallback/1 with mnesia and khepri closures) is faithful to the existing recent-history / consistent-hash / JMS converter precedent.
  • Moving node-specific ?TABLE_NAME/?INDEX_TABLE_NAME expansion out of the rabbit_mnesia_tables_to_khepri_db attribute and behind an MFA pointer to rabbit_delayed_message_mnesia:table_names/0 is exactly the documented way to handle dynamic table names. The comment on that function explaining it "must not assume any RabbitMQ subsystem is running" is on-point.
  • <<TS:64/big, Random:16/binary>> as the Leveled key is a good choice: sort order equals delivery order, cross-exchange, and the 16-byte random suffix gives plenty of collision resistance for same-millisecond inserts.
  • The deterministic migration key design is clever.
  • The benchmark + flamegraph scaffolding in benchmark_SUITE.erl and the Makefile targets is well-thought-out and should help future tuning.

What I would ask for before merge

Annotations below indicate which items are addressed on the follow-up topic branch described in "Addressed on a follow-up topic branch" above.

  1. Fix bugs #1, #2, #3. These are not style issues. (Bug #3 addressed on topic branch; #1 and #2 remain.)
  2. Remove the duplicate init_per_group(mnesia_to_khepri, Config) clause and reconcile its config. (Addressed on topic branch.)
  3. Remove clear_data_in_khepri/1 or hook it up to something real. (Addressed on topic branch; the dead export and body are removed.)
  4. Replace the sed -i hack with a portable equivalent. (Addressed on topic branch.)
  5. Remove the TODO comments by either resolving the concerns or filing tracked issues.
  6. Move the Leveled data path out from under rabbit_khepri:dir().
  7. Add at least two migration tests: one that publishes during the migration, and one that restarts the node during consumption.
  8. Add a non-blocking or timed wrapper around rabbit_feature_flags:is_enabled(khepri_db) in maybe_switch_to_leveled/1.

Happy to review a second round after these are addressed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment