Review: cloudamqp/rabbitmq-delayed-message-exchange#2 - Migrate from Mnesia to Leveled
Scope: PR head 3878a85 (24 commits, +1377 / -259). Focus: correctness and design.
The layering is defensible and follows the RabbitMQ precedent for Mnesia-to-Khepri backends (recent-history, JMS, consistent-hash exchanges). Single-bucket timestamp-prefixed keys are a reasonable choice. But the Leveled backend has two critical correctness bugs: storage is never actually freed after delivery, and messages_delayed/1 is wrong for any exchange that has ever delivered a message. There are also several design and robustness issues around the gen_server / migration handoff and a macOS-hostile Makefile. The migration path has a documented "at-most-one extra delivery" tradeoff that is defensible but not exercised by the tests. The PR should not merge as-is; the two critical bugs are blockers. A follow-up topic branch has already landed fixes for several other items in this review; see the section below.
The following items from this review have been addressed in a separate topic branch (from-mnesia-to-leveled-pr-review) with commits on top of 3878a85:
- Critical bug #3 (
delete_index/1no-op on the leveled backend) is fixed. - The duplicate
init_per_group(mnesia_to_khepri, _)clause intest/plugin_SUITE.erlis removed; the retained clause is the one with{tcp_ports_base, 21200}. - The
clear_data_in_khepri/1dead export and body are removed fromrabbit_delayed_message_m2k_converter. - The
sed -ibasedpatch-leveled-appMakefile target is replaced with a portableerl -noshellinvocation.
Every other item in the review, including critical bugs #1 and #2, is still present on the topic branch and remains unaddressed.
handle_info({timeout, _TimerRef, {deliver, Key}}, State) ->
case get_many(Key) of
[] -> delete_index(Key);
Deliveries -> _ = route(Deliveries, State),
delete(Key),
delete_index(Key)
end,get_many/2 iterates ets:first(?INDEX_TABLE) and deletes each index entry whose DelayTS matches the fired timer. When it returns, the ETS table no longer contains any entry with the fired DelayTS. Then delete(DelayTS) runs:
delete(DelayTS) ->
...
case ets:first(?INDEX_TABLE) of
'$end_of_table' -> ok;
{FirstDelay, Key} = IndexEntry when FirstDelay =:= DelayTS ->
ets:delete(?INDEX_TABLE, IndexEntry),
leveled_bookie:book_delete(?BOOKIE, ?BUCKET, Key, []);
_ -> ok
end.ets:first/1 now returns the NEXT earliest entry, whose DelayTS' is strictly greater than the argument. The guard FirstDelay =:= DelayTS fails, the _ clause runs, and no book_delete is issued for any of the entries that were just delivered. Consequence:
- The Leveled bucket grows monotonically; every delivered message stays on disk forever.
init_index/0rebuilds the ETS index from Leveled on every startup, so after a restart the delivered-long-ago messages are re-queued and re-delivered.- The
leveled_bookie:book_put/5call insidestore_delay/3discards its return value, so thepauseback-pressure signal from Leveled never reaches the caller. Not a bug on its own, but it means the store has no brake as it grows.
The fix is to delete via the entries get_many already collected, not by re-reading ETS. Something like:
get_many(DelayTS) ->
IndexEntries = pop_index_entries(DelayTS, []),
Results = [case leveled_bookie:book_get(?BOOKIE, ?BUCKET, K) of
{ok, V} ->
_ = leveled_bookie:book_delete(?BOOKIE, ?BUCKET, K, []),
{ok, V};
not_found -> not_found
end || {_, K} <- IndexEntries],
[Entry || {ok, V} <- Results, Entry <- [binary_to_term(V)]].and then delete/1 and delete_index/1 become no-ops (or are removed entirely from the leveled backend and the dispatcher).
store_delay/3 calls increase_counter/1. Nothing calls a corresponding decrease_counter/1. There is no such function. The counter equals total writes since the last full fold, not pending deliveries. Only init_counters/0 (run once at setup/0) produces a correct value, by folding over every Leveled object.
For the Mnesia backend, messages_delayed/1 uses mnesia:dirty_select at call time and is always accurate. The leveled value is exposed to users via rabbit_exchange_type_delayed_message:info/2, which the broker surfaces via rabbit_exchange:info/1 on the HTTP management API and any tool that fetches exchange info (including rabbitmqctl list_exchanges name messages_delayed). So this is user-visible.
Note this combines adversely with bug #1: if the storage leak is fixed but the counter is not, the counter is now also inflated. And vice versa: fixing the counter but not the storage leak means the counter also stays wrong (since the values are still there).
The symmetric fix is to decrement inside the delete path. Alternatively, drop atomics and compute the per-exchange count by folding ETS at query time; with an in-memory index this is cheap.
Addressed on topic branch (commit 2ce3df4).
delete_index(DeliveryTS) ->
case ets:whereis(?INDEX_TABLE) of
undefined -> ok;
_ -> ets:delete(?INDEX_TABLE, DeliveryTS)
end.ETS keys are tuples {DelayTS, LeveledKey}, not bare integers. ets:delete(Tab, DeliveryTS) never matches. Normally this is masked because get_many/2 has already removed the entries; it matters only on the empty-deliveries branch in handle_info (if an entry somehow made it into the index without a corresponding Leveled object, which shouldn't happen if storage and index stay in sync, but is exactly the path this branch exists to recover from). As a result the plugin cannot self-heal an index-vs-store divergence on the leveled backend, even though it can on the Mnesia one.
Path = filename:join([rabbit_khepri:dir(), "rabbit_delayed_message", "leveled"]),rabbit_khepri:dir() = filename:join(rabbit_mnesia:dir(), atom_to_list(?STORE_ID)) where ?STORE_ID is rabbitmq_metadata. So the per-node delayed-message Leveled store sits at $DATA_DIR/rabbitmq_metadata/rabbit_delayed_message/leveled/. Any operator-level Khepri reset, or documentation that says "safe to remove the Khepri dir after...", also wipes delayed messages. This path is unrelated to Khepri data and probably should live under rabbit_mnesia:dir() directly (e.g., $DATA_DIR/rabbit_delayed_message/leveled/), independent of which metadata store is in use.
leveled_bookie:book_start/1 uses gen_server:start_link/3, so the bookie is linked to whichever process opens it. Two places open bookies at the same root path:
rabbit_delayed_message_m2k_converter:init_copy_to_khepri/3, running in the migration worker process.rabbit_delayed_message_leveled:setup/0, running in the delayed-message gen_server.
The hand-off relies on setup/0 calling catch leveled_bookie:book_close(?BOOKIE) first. Two fragilities here:
- If the migration worker crashes during
copy_to_khepri/3, the bookie dies with it (link), and thekhepri_dbfeature flag never transitions to enabled. The gen_server is already insidehandle_cast(await_khepri_and_setup, ...), blocked inrabbit_feature_flags:is_enabled(khepri_db). When the transition is abandoned,is_enabled/1returnsfalse,maybe_switch_to_leveled/1takes the warning-only branch, and the bookie persistent_term entry still points to the dead pid. Journal recovery eventually restores the data on the next cleansetup/0, but the timing and idempotency across a supervisor-restarted gen_server are untested. Worth adding a test that SIGKILLs the migration mid-copy. catch leveled_bookie:book_close(?BOOKIE)where?BOOKIEcan beundefined(fresh boot, no persistent_term set) or a dead pid (after migration converter closed it, or afterdisable_plugin/0left the persistent_term in place) is a catch-all that relies oncatchabsorbing whatevergen_server:call/3raises. It works, but it hides the distinction between "nothing to close" and "stale pid left over". An explicitcase ?BOOKIE of undefined -> ok; Pid -> catch leveled_bookie:book_close(Pid) endwith apersistent_term:eraseat the end ofdisable_plugin/0is clearer and gives room to log when the close unexpectedly fails.
maybe_switch_to_leveled(State = #state{timer = CurrTimer}) ->
case rabbit_feature_flags:is_enabled(khepri_db) ofis_enabled/1 defaults to blocking mode, which in turn calls rabbit_ff_registry_factory:acquire_state_change_lock(). If the migration is stuck or the khepri_db FF transition never resolves, the delayed-message gen_server is unable to process {delay_message, ...} calls while the handle_cast(await_khepri_and_setup, ...) handler runs, because calls and casts share the mailbox. Since delay_message/3 uses gen_server:call(..., infinity), publishers' publishes block indefinitely rather than erroring out. There's no timeout or cancellation path on the gen_server side. Using non_blocking and a short retry loop would preserve liveness.
case leveled_bookie:book_put(Bookie, ?BUCKET, Key, ..., []) of
ok -> ok;
pause -> int_migration_pause() %% timer:sleep(1000)
end,Per the mnesia_to_khepri_converter behaviour contract, "all callbacks are run in the same process." A 1-second sleep therefore blocks the entire table-copy pipeline. If Leveled returns pause frequently during a large migration (which is exactly when back-pressure exists), migration throughput collapses to ~1 record/s. A better response is to apply back-pressure to the Mnesia checkpoint reader (e.g., slow the bookkeeping side) or to treat pause as advisory: continue immediately but with jittered short waits.
For a timer that fires on N due messages at the same timestamp, the code does N individual leveled_bookie:book_get calls, each a full gen_server:call through the bookie. For bursts this stalls the delayed-message gen_server (which is itself a gen_server:call back from delay_message/3 publishers). An earlier commit used book_objectfold with a range of <<DelayTS:64/big, 0:128>> to <<DelayTS:64/big, 0xFF*16>>, which issues a single gen_server:call to set up the snapshot and then folds via a snapshot owned by the calling process. Replacing N individual point-lookup round-trips with one snapshot-based range fold is a measurable improvement for any burst.
Addressed on topic branch (commit e11e90d).
clear_data_in_khepri(Table) -> ...The mnesia_to_khepri_converter behaviour does not declare a clear_data_in_khepri/1 callback. The RabbitMQ-level rabbit_db_m2k_converter intermediate (which this module does NOT use) sometimes calls the converter's own private clear_data_in_khepri, but the PR directly implements mnesia_to_khepri_converter. Remove the export and the function.
Four TODO comments in the PR mark unresolved questions:
rabbit_delayed_message.erlline 70: startup behaviour after the removal ofgo/0.rabbit_delayed_message.erlline 184: an avoidable doubleget_first_delay/0call ininternal_delay_message/4.rabbit_delayed_message.erlline 266: how#state.stats_stategets refreshed across backends.rabbit_delayed_message_leveled.erlline 186: "oof, having to transform each term aint good" on the per-exchange counter fold.
Either resolve them or open tracking issues before merge.
init_counters/0 folds over all objects in Leveled and, for every exchange with at least one pending delivery, creates a fresh atomic via atomics:new/1 and installs it with persistent_term:put/2, replacing any previously-stored atomic for that exchange. The overwritten atomic becomes unreferenced and its NIF resource can be reclaimed, so this is not a memory leak on its own. But persistent_term:put triggers a global literal-area rebuild; doing it once per exchange on every setup/0 is more disruptive than necessary. Check-then-reuse (keep the existing atomic if present, just overwrite its value with atomics:put/3) would be tidier.
disable_plugin() ->
catch ets:delete(?INDEX_TABLE),
leveled_bookie:book_close(?BOOKIE).Neither the bookie persistent_term entry nor the per-exchange counter entries are cleared. On re-enable without a VM restart, persistent_term:get({?MODULE, bookie}) still returns the now-dead pid. setup/0's catch leveled_bookie:book_close(?BOOKIE) then raises a noproc exit inside gen_server:call, which catch silently absorbs (no log output). So this is not user-visible noise, but there is a latent footgun: a new process could in principle claim the old pid and receive a stray close message. Clear the persistent_term entries on disable.
The migration design is sound apart from the items above. One thing worth calling out positively:
KeySuffix = crypto:hash(md5, term_to_binary({TS, Exchange, Ref})),
Key = <<TS:64/big, KeySuffix/binary>>,A deterministic Leveled key prevents duplicate inserts on a retried migration - good. The comment on delete_from_khepri/3 openly says the converter ignores concurrent Mnesia deletions during the copy window and that this can cause at-most-one redundant delivery. That tradeoff is defensible given the alternatives (two-phase synchronisation with an always-progressing delivery timer is hard), but it deserves a test or at least an item in the plugin documentation.
- Duplicate
init_per_group(mnesia_to_khepri, Config)clause. The second clause is unreachable (Erlang takes the first); compiler will warn. The second clause also differs from the first (it lacks{tcp_ports_base, 21200}), suggesting an incomplete rebase. Remove one, and decide intentionally which settings to keep. (Addressed on topic branch, commit c42c1aa: the second clause is removed; the retained clause is the one with{tcp_ports_base, 21200}.) mnesia_to_khepri_migration/1is shallow. It publishes 3 messages with a 30-second delay, flipskhepri_db, and consumes. It does not:- assert
messages_delayeddrops to zero after consumption (would expose bug #2); - publish additional messages DURING the migration (would exercise the
delete_from_khepri/3at-most-once acknowledgement); - restart the node between migration and consumption (would exercise the converter-opened bookie then
setup/0takeover); - verify that a second, post-migration publish works correctly (would exercise
get_first_delayon the new backend).
- assert
- No leveled-equivalent of
no_message_for_index. The removed test exercised an index-vs-store divergence recovery path. The leveled backend has a symmetric failure mode (ETS populated byinit_index/0, butbook_getreturnsnot_foundfor a key that was corrupted or deleted externally) and theget_manyempty-deliveries branch is its recovery path. A test that forciblybook_deletes one object between publish and consume would exercise this and would probably fail today because of bug #3. - benchmark_SUITE adds value and deliberately excludes itself from
make ctby settingCT_SUITES = plugin. That's the right call. It still callsrabbit_delayed_message_mnesia:table_name/0andindex_table_name/0to clear the Mnesia tables between load-size runs; keep those exports.
Addressed on topic branch (commit c2c980a): the two
sed -icalls are replaced with a singleerl -noshell -evalinvocation that reads the.appfile as an Erlang term, removeslz4/zstdfrom theapplicationslist via set-difference, and writes the file back only when the list actually changes.
patch-leveled-app:
$(verbose) if [ -f $(DEPS_DIR)/leveled/ebin/leveled.app ]; then \
sed -i 's/,zstd//' $(DEPS_DIR)/leveled/ebin/leveled.app; \
sed -i 's/,lz4//' $(DEPS_DIR)/leveled/ebin/leveled.app; \
fiGNU sed -i takes the in-place flag alone; BSD/macOS sed requires -i ''. This target will fail on macOS CI and on dev machines. Use sed -i.bak 's/,zstd//' ... && rm -f $(...).bak, or perl -i -pe ..., or better: edit via erl -noshell -eval that reads/writes the .app term.
OTP 28's stdlib zstd module clashes with OpenRiak's NIF; RabbitMQ 4.2.0 ships an incompatible lz4. Leveled's only runtime users of those modules (leveled_codec:serialise_object/3 and deserialise_object/4) are never reached when {compression_method, none} is passed to book_start/1, which the PR does. So stripping them from leveled.app prevents OTP from trying to start apps that don't exist, and is safe. The reason is non-obvious. A comment above the DEPS line explains part of this; the Makefile target's comment should also note that the NIF libraries are never loaded in this build because {compression_method, none} short-circuits them in leveled_codec. A builder who changes setup/0 to use lz4 later will find the system starts failing and it won't be obvious why.
- The dispatch pattern in
rabbit_delayed_message.erl(calls throughrabbit_khepri:handle_fallback/1with mnesia and khepri closures) is faithful to the existing recent-history / consistent-hash / JMS converter precedent. - Moving node-specific
?TABLE_NAME/?INDEX_TABLE_NAMEexpansion out of therabbit_mnesia_tables_to_khepri_dbattribute and behind an MFA pointer torabbit_delayed_message_mnesia:table_names/0is exactly the documented way to handle dynamic table names. The comment on that function explaining it "must not assume any RabbitMQ subsystem is running" is on-point. <<TS:64/big, Random:16/binary>>as the Leveled key is a good choice: sort order equals delivery order, cross-exchange, and the 16-byte random suffix gives plenty of collision resistance for same-millisecond inserts.- The deterministic migration key design is clever.
- The benchmark + flamegraph scaffolding in
benchmark_SUITE.erland the Makefile targets is well-thought-out and should help future tuning.
Annotations below indicate which items are addressed on the follow-up topic branch described in "Addressed on a follow-up topic branch" above.
- Fix bugs #1, #2, #3. These are not style issues. (Bug #3 addressed on topic branch; #1 and #2 remain.)
- Remove the duplicate
init_per_group(mnesia_to_khepri, Config)clause and reconcile its config. (Addressed on topic branch.) - Remove
clear_data_in_khepri/1or hook it up to something real. (Addressed on topic branch; the dead export and body are removed.) - Replace the
sed -ihack with a portable equivalent. (Addressed on topic branch.) - Remove the
TODOcomments by either resolving the concerns or filing tracked issues. - Move the Leveled data path out from under
rabbit_khepri:dir(). - Add at least two migration tests: one that publishes during the migration, and one that restarts the node during consumption.
- Add a non-blocking or timed wrapper around
rabbit_feature_flags:is_enabled(khepri_db)inmaybe_switch_to_leveled/1.
Happy to review a second round after these are addressed.