Last active
April 27, 2026 18:52
-
-
Save lukebakken/ee1d50ae268010a3e8fa17f7ce614f53 to your computer and use it in GitHub Desktop.
Revisions
-
lukebakken revised this gist
Apr 27, 2026 . 1 changed file with 27 additions and 8 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,7 +4,18 @@ ## TL;DR The layering is defensible and follows the RabbitMQ precedent for Mnesia-to-Khepri backends (recent-history, JMS, consistent-hash exchanges). Single-bucket timestamp-prefixed keys are a reasonable choice. But the Leveled backend has **two critical correctness bugs**: storage is never actually freed after delivery, and `messages_delayed/1` is wrong for any exchange that has ever delivered a message. There are also several design and robustness issues around the gen_server / migration handoff and a macOS-hostile Makefile. The migration path has a documented "at-most-one extra delivery" tradeoff that is defensible but not exercised by the tests. The PR should not merge as-is; the two critical bugs are blockers. A follow-up topic branch has already landed fixes for several other items in this review; see the section below. ## Addressed on a follow-up topic branch The following items from this review have been addressed in a separate topic branch (`from-mnesia-to-leveled-pr-review`) with commits on top of 3878a85: - Critical bug #3 (`delete_index/1` no-op on the leveled backend) is fixed. - The duplicate `init_per_group(mnesia_to_khepri, _)` clause in `test/plugin_SUITE.erl` is removed; the retained clause is the one with `{tcp_ports_base, 21200}`. - The `clear_data_in_khepri/1` dead export and body are removed from `rabbit_delayed_message_m2k_converter`. - The `sed -i` based `patch-leveled-app` Makefile target is replaced with a portable `erl -noshell` invocation. Every other item in the review, including critical bugs #1 and #2, is still present on the topic branch and remains unaddressed. ## Critical correctness bugs @@ -60,14 +71,16 @@ and then `delete/1` and `delete_index/1` become no-ops (or are removed entirely `store_delay/3` calls `increase_counter/1`. Nothing calls a corresponding `decrease_counter/1`. There is no such function. The counter equals **total writes since the last full fold**, not pending deliveries. Only `init_counters/0` (run once at `setup/0`) produces a correct value, by folding over every Leveled object. For the Mnesia backend, `messages_delayed/1` uses `mnesia:dirty_select` at call time and is always accurate. The leveled value is exposed to users via `rabbit_exchange_type_delayed_message:info/2`, which the broker surfaces via `rabbit_exchange:info/1` on the HTTP management API and any tool that fetches exchange info (including `rabbitmqctl list_exchanges name messages_delayed`). So this is user-visible. Note this combines adversely with bug #1: if the storage leak is fixed but the counter is not, the counter is now also inflated. And vice versa: fixing the counter but not the storage leak means the counter also stays wrong (since the values are still there). The symmetric fix is to decrement inside the delete path. Alternatively, drop atomics and compute the per-exchange count by folding ETS at query time; with an in-memory index this is cheap. ### 3. `delete_index/1` is a no-op on the leveled backend > Addressed on topic branch (commit 2ce3df4). ```erlang delete_index(DeliveryTS) -> case ets:whereis(?INDEX_TABLE) of @@ -86,7 +99,7 @@ ETS keys are tuples `{DelayTS, LeveledKey}`, not bare integers. `ets:delete(Tab, Path = filename:join([rabbit_khepri:dir(), "rabbit_delayed_message", "leveled"]), ``` `rabbit_khepri:dir() = filename:join(rabbit_mnesia:dir(), atom_to_list(?STORE_ID))` where `?STORE_ID` is `rabbitmq_metadata`. So the per-node delayed-message Leveled store sits at `$DATA_DIR/rabbitmq_metadata/rabbit_delayed_message/leveled/`. Any operator-level Khepri reset, or documentation that says "safe to remove the Khepri dir after...", also wipes delayed messages. This path is unrelated to Khepri data and probably should live under `rabbit_mnesia:dir()` directly (e.g., `$DATA_DIR/rabbit_delayed_message/leveled/`), independent of which metadata store is in use. ### Bookie ownership during the Mnesia-to-Leveled migration @@ -126,6 +139,8 @@ For a timer that fires on N due messages at the same timestamp, the code does N ### `clear_data_in_khepri/1` is dead code > Addressed on topic branch (commit e11e90d). ```erlang clear_data_in_khepri(Table) -> ... ``` @@ -170,7 +185,7 @@ A deterministic Leveled key prevents duplicate inserts on a retried migration - ## Test-suite gaps - **Duplicate `init_per_group(mnesia_to_khepri, Config)` clause.** The second clause is unreachable (Erlang takes the first); compiler will warn. The second clause also differs from the first (it lacks `{tcp_ports_base, 21200}`), suggesting an incomplete rebase. Remove one, and decide intentionally which settings to keep. (Addressed on topic branch, commit c42c1aa: the second clause is removed; the retained clause is the one with `{tcp_ports_base, 21200}`.) - **`mnesia_to_khepri_migration/1` is shallow.** It publishes 3 messages with a 30-second delay, flips `khepri_db`, and consumes. It does not: - assert `messages_delayed` drops to zero after consumption (would expose bug #2); - publish additional messages DURING the migration (would exercise the `delete_from_khepri/3` at-most-once acknowledgement); @@ -183,6 +198,8 @@ A deterministic Leveled key prevents duplicate inserts on a retried migration - ### `patch-leveled-app` is non-portable > Addressed on topic branch (commit c2c980a): the two `sed -i` calls are replaced with a single `erl -noshell -eval` invocation that reads the `.app` file as an Erlang term, removes `lz4`/`zstd` from the `applications` list via set-difference, and writes the file back only when the list actually changes. ```makefile patch-leveled-app: $(verbose) if [ -f $(DEPS_DIR)/leveled/ebin/leveled.app ]; then \ @@ -207,10 +224,12 @@ OTP 28's stdlib `zstd` module clashes with OpenRiak's NIF; RabbitMQ 4.2.0 ships ## What I would ask for before merge Annotations below indicate which items are addressed on the follow-up topic branch described in "Addressed on a follow-up topic branch" above. 1. Fix bugs #1, #2, #3. These are not style issues. (Bug #3 addressed on topic branch; #1 and #2 remain.) 2. Remove the duplicate `init_per_group(mnesia_to_khepri, Config)` clause and reconcile its config. (Addressed on topic branch.) 3. Remove `clear_data_in_khepri/1` or hook it up to something real. (Addressed on topic branch; the dead export and body are removed.) 4. Replace the `sed -i` hack with a portable equivalent. (Addressed on topic branch.) 5. Remove the `TODO` comments by either resolving the concerns or filing tracked issues. 6. Move the Leveled data path out from under `rabbit_khepri:dir()`. 7. Add at least two migration tests: one that publishes during the migration, and one that restarts the node during consumption. -
lukebakken revised this gist
Apr 27, 2026 . 1 changed file with 18 additions and 11 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -38,7 +38,7 @@ delete(DelayTS) -> - The Leveled bucket grows monotonically; every delivered message stays on disk forever. - `init_index/0` rebuilds the ETS index from Leveled on every startup, so after a restart the delivered-long-ago messages are re-queued and re-delivered. - The `leveled_bookie:book_put/5` call inside `store_delay/3` discards its return value, so the `pause` back-pressure signal from Leveled never reaches the caller. Not a bug on its own, but it means the store has no brake as it grows. The fix is to delete via the entries `get_many` already collected, not by re-reading ETS. Something like: @@ -97,8 +97,8 @@ Path = filename:join([rabbit_khepri:dir(), "rabbit_delayed_message", "leveled"]) The hand-off relies on `setup/0` calling `catch leveled_bookie:book_close(?BOOKIE)` first. Two fragilities here: - If the migration worker crashes during `copy_to_khepri/3`, the bookie dies with it (link), and the `khepri_db` feature flag never transitions to enabled. The gen_server is already inside `handle_cast(await_khepri_and_setup, ...)`, blocked in `rabbit_feature_flags:is_enabled(khepri_db)`. When the transition is abandoned, `is_enabled/1` returns `false`, `maybe_switch_to_leveled/1` takes the warning-only branch, and the bookie persistent_term entry still points to the dead pid. Journal recovery eventually restores the data on the next clean `setup/0`, but the timing and idempotency across a supervisor-restarted gen_server are untested. Worth adding a test that SIGKILLs the migration mid-copy. - `catch leveled_bookie:book_close(?BOOKIE)` where `?BOOKIE` can be `undefined` (fresh boot, no persistent_term set) or a dead pid (after migration converter closed it, or after `disable_plugin/0` left the persistent_term in place) is a catch-all that relies on `catch` absorbing whatever `gen_server:call/3` raises. It works, but it hides the distinction between "nothing to close" and "stale pid left over". An explicit `case ?BOOKIE of undefined -> ok; Pid -> catch leveled_bookie:book_close(Pid) end` with a `persistent_term:erase` at the end of `disable_plugin/0` is clearer and gives room to log when the close unexpectedly fails. ### `maybe_switch_to_leveled/1` blocks the gen_server indefinitely on a stuck feature flag @@ -107,7 +107,7 @@ maybe_switch_to_leveled(State = #state{timer = CurrTimer}) -> case rabbit_feature_flags:is_enabled(khepri_db) of ``` `is_enabled/1` defaults to `blocking` mode, which in turn calls `rabbit_ff_registry_factory:acquire_state_change_lock()`. If the migration is stuck or the khepri_db FF transition never resolves, the delayed-message gen_server is unable to process `{delay_message, ...}` calls while the `handle_cast(await_khepri_and_setup, ...)` handler runs, because calls and casts share the mailbox. Since `delay_message/3` uses `gen_server:call(..., infinity)`, publishers' publishes block indefinitely rather than erroring out. There's no timeout or cancellation path on the gen_server side. Using `non_blocking` and a short retry loop would preserve liveness. ### `copy_to_khepri/3` handles `pause` by sleeping in the single migration worker @@ -122,7 +122,7 @@ Per the `mnesia_to_khepri_converter` behaviour contract, "all callbacks are run ### `get_many/1` issues one `book_get` per entry For a timer that fires on N due messages at the same timestamp, the code does N individual `leveled_bookie:book_get` calls, each a full `gen_server:call` through the bookie. For bursts this stalls the delayed-message gen_server (which is itself a `gen_server:call` back from `delay_message/3` publishers). An earlier commit used `book_objectfold` with a range of `<<DelayTS:64/big, 0:128>>` to `<<DelayTS:64/big, 0xFF*16>>`, which issues a single `gen_server:call` to set up the snapshot and then folds via a snapshot owned by the calling process. Replacing N individual point-lookup round-trips with one snapshot-based range fold is a measurable improvement for any burst. ### `clear_data_in_khepri/1` is dead code @@ -134,11 +134,18 @@ The `mnesia_to_khepri_converter` behaviour does not declare a `clear_data_in_khe ### Unresolved `TODO` comments in shipping code Four `TODO` comments in the PR mark unresolved questions: - `rabbit_delayed_message.erl` line 70: startup behaviour after the removal of `go/0`. - `rabbit_delayed_message.erl` line 184: an avoidable double `get_first_delay/0` call in `internal_delay_message/4`. - `rabbit_delayed_message.erl` line 266: how `#state.stats_state` gets refreshed across backends. - `rabbit_delayed_message_leveled.erl` line 186: "oof, having to transform each term aint good" on the per-exchange counter fold. Either resolve them or open tracking issues before merge. ### Minor: counter atomics on repeated `setup/0` `init_counters/0` folds over all objects in Leveled and, for every exchange with at least one pending delivery, creates a fresh atomic via `atomics:new/1` and installs it with `persistent_term:put/2`, replacing any previously-stored atomic for that exchange. The overwritten atomic becomes unreferenced and its NIF resource can be reclaimed, so this is not a memory leak on its own. But `persistent_term:put` triggers a global literal-area rebuild; doing it once per exchange on every `setup/0` is more disruptive than necessary. Check-then-reuse (keep the existing atomic if present, just overwrite its value with `atomics:put/3`) would be tidier. ### Minor: `disable_plugin/0` leaves persistent_term state @@ -148,7 +155,7 @@ disable_plugin() -> leveled_bookie:book_close(?BOOKIE). ``` Neither the bookie persistent_term entry nor the per-exchange counter entries are cleared. On re-enable without a VM restart, `persistent_term:get({?MODULE, bookie})` still returns the now-dead pid. `setup/0`'s `catch leveled_bookie:book_close(?BOOKIE)` then raises a `noproc` exit inside `gen_server:call`, which `catch` silently absorbs (no log output). So this is not user-visible noise, but there is a latent footgun: a new process could in principle claim the old pid and receive a stray `close` message. Clear the persistent_term entries on disable. ## Migration correctness @@ -170,7 +177,7 @@ A deterministic Leveled key prevents duplicate inserts on a retried migration - - restart the node between migration and consumption (would exercise the converter-opened bookie then `setup/0` takeover); - verify that a second, post-migration publish works correctly (would exercise `get_first_delay` on the new backend). - **No leveled-equivalent of `no_message_for_index`.** The removed test exercised an index-vs-store divergence recovery path. The leveled backend has a symmetric failure mode (ETS populated by `init_index/0`, but `book_get` returns `not_found` for a key that was corrupted or deleted externally) and the `get_many` empty-deliveries branch is its recovery path. A test that forcibly `book_delete`s one object between publish and consume would exercise this and would probably fail today because of bug #3. - **benchmark_SUITE adds value** and deliberately excludes itself from `make ct` by setting `CT_SUITES = plugin`. That's the right call. It still calls `rabbit_delayed_message_mnesia:table_name/0` and `index_table_name/0` to clear the Mnesia tables between load-size runs; keep those exports. ## Build system @@ -193,7 +200,7 @@ OTP 28's stdlib `zstd` module clashes with OpenRiak's NIF; RabbitMQ 4.2.0 ships ## Positives worth preserving - The dispatch pattern in `rabbit_delayed_message.erl` (calls through `rabbit_khepri:handle_fallback/1` with mnesia and khepri closures) is faithful to the existing recent-history / consistent-hash / JMS converter precedent. - Moving node-specific `?TABLE_NAME`/`?INDEX_TABLE_NAME` expansion out of the `rabbit_mnesia_tables_to_khepri_db` attribute and behind an MFA pointer to `rabbit_delayed_message_mnesia:table_names/0` is exactly the documented way to handle dynamic table names. The comment on that function explaining it "must not assume any RabbitMQ subsystem is running" is on-point. - `<<TS:64/big, Random:16/binary>>` as the Leveled key is a good choice: sort order equals delivery order, cross-exchange, and the 16-byte random suffix gives plenty of collision resistance for same-millisecond inserts. - The deterministic migration key design is clever. - The benchmark + flamegraph scaffolding in `benchmark_SUITE.erl` and the Makefile targets is well-thought-out and should help future tuning. -
lukebakken revised this gist
Apr 27, 2026 . 1 changed file with 212 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1 +1,212 @@ ## Review: cloudamqp/rabbitmq-delayed-message-exchange#2 - Migrate from Mnesia to Leveled **Scope:** PR head 3878a85 (24 commits, +1377 / -259). Focus: correctness and design. ## TL;DR The layering is defensible and follows the RabbitMQ precedent for Mnesia-to-Khepri backends (recent-history, JMS, consistent-hash exchanges). Single-bucket timestamp-prefixed keys are a reasonable choice. But the Leveled backend has **two critical correctness bugs**: storage is never actually freed after delivery, and `messages_delayed/1` is wrong for any exchange that has ever delivered a message. There are also several design and robustness issues around the gen_server / migration handoff and a macOS-hostile Makefile. The migration path has a documented "at-most-one extra delivery" tradeoff that is defensible but not exercised by the tests. The PR should not merge as-is; the two critical bugs and the duplicate `init_per_group` clause are blockers. ## Critical correctness bugs ### 1. Delivered messages are never removed from Leveled (`src/rabbit_delayed_message_leveled.erl`) ```erlang handle_info({timeout, _TimerRef, {deliver, Key}}, State) -> case get_many(Key) of [] -> delete_index(Key); Deliveries -> _ = route(Deliveries, State), delete(Key), delete_index(Key) end, ``` `get_many/2` iterates `ets:first(?INDEX_TABLE)` and **deletes** each index entry whose `DelayTS` matches the fired timer. When it returns, the ETS table no longer contains any entry with the fired `DelayTS`. Then `delete(DelayTS)` runs: ```erlang delete(DelayTS) -> ... case ets:first(?INDEX_TABLE) of '$end_of_table' -> ok; {FirstDelay, Key} = IndexEntry when FirstDelay =:= DelayTS -> ets:delete(?INDEX_TABLE, IndexEntry), leveled_bookie:book_delete(?BOOKIE, ?BUCKET, Key, []); _ -> ok end. ``` `ets:first/1` now returns the NEXT earliest entry, whose `DelayTS'` is strictly greater than the argument. The guard `FirstDelay =:= DelayTS` fails, the `_` clause runs, and **no `book_delete` is issued for any of the entries that were just delivered**. Consequence: - The Leveled bucket grows monotonically; every delivered message stays on disk forever. - `init_index/0` rebuilds the ETS index from Leveled on every startup, so after a restart the delivered-long-ago messages are re-queued and re-delivered. - The `catch`-less `leveled_bookie:book_put/5` inside `store_delay/3` ignores the `pause` return value, so Leveled back-pressure has no effect on the caller. Not a bug on its own, but it means the store has no brake as it grows. The fix is to delete via the entries `get_many` already collected, not by re-reading ETS. Something like: ```erlang get_many(DelayTS) -> IndexEntries = pop_index_entries(DelayTS, []), Results = [case leveled_bookie:book_get(?BOOKIE, ?BUCKET, K) of {ok, V} -> _ = leveled_bookie:book_delete(?BOOKIE, ?BUCKET, K, []), {ok, V}; not_found -> not_found end || {_, K} <- IndexEntries], [Entry || {ok, V} <- Results, Entry <- [binary_to_term(V)]]. ``` and then `delete/1` and `delete_index/1` become no-ops (or are removed entirely from the leveled backend and the dispatcher). ### 2. `messages_delayed/1` only increments (`src/rabbit_delayed_message_leveled.erl`) `store_delay/3` calls `increase_counter/1`. Nothing calls a corresponding `decrease_counter/1`. There is no such function. The counter equals **total writes since the last full fold**, not pending deliveries. Only `init_counters/0` (run once at `setup/0`) produces a correct value, by folding over every Leveled object. For the Mnesia backend, `messages_delayed/1` uses `mnesia:dirty_select` at call time and is always accurate. The leveled value is exposed to users via `rabbit_exchange_type_delayed_message:info/2` and shown in management UIs, so this is user-visible. Note this combines adversely with bug #1: if the storage leak is fixed but the counter is not, the counter is now also inflated. And vice versa: fixing the counter but not the storage leak means the counter also stays wrong (since the values are still there). The symmetric fix is to decrement inside the delete path. Alternatively, drop atomics and compute the per-exchange count by folding ETS at query time; with an in-memory index this is cheap. ### 3. `delete_index/1` is a no-op on the leveled backend ```erlang delete_index(DeliveryTS) -> case ets:whereis(?INDEX_TABLE) of undefined -> ok; _ -> ets:delete(?INDEX_TABLE, DeliveryTS) end. ``` ETS keys are tuples `{DelayTS, LeveledKey}`, not bare integers. `ets:delete(Tab, DeliveryTS)` never matches. Normally this is masked because `get_many/2` has already removed the entries; it matters only on the empty-deliveries branch in `handle_info` (if an entry somehow made it into the index without a corresponding Leveled object, which shouldn't happen if storage and index stay in sync, but is exactly the path this branch exists to recover from). As a result the plugin cannot self-heal an index-vs-store divergence on the leveled backend, even though it can on the Mnesia one. ## Design concerns ### Leveled data directory sits inside the Khepri store ```erlang Path = filename:join([rabbit_khepri:dir(), "rabbit_delayed_message", "leveled"]), ``` `rabbit_khepri:dir() = filename:join(rabbit_mnesia:dir(), atom_to_list(?STORE_ID))`. So the per-node delayed-message Leveled store sits at `$DATA_DIR/rabbitmq_khepri_metadata_store/rabbit_delayed_message/leveled/`. Any operator-level Khepri reset, or documentation that says "safe to remove the Khepri dir after...", also wipes delayed messages. This path is unrelated to Khepri data and probably should live under `rabbit_mnesia:dir()` directly (e.g., `$DATA_DIR/rabbit_delayed_message/leveled/`), independent of which metadata store is in use. ### Bookie ownership during the Mnesia-to-Leveled migration `leveled_bookie:book_start/1` uses `gen_server:start_link/3`, so the bookie is linked to whichever process opens it. Two places open bookies at the same root path: 1. `rabbit_delayed_message_m2k_converter:init_copy_to_khepri/3`, running in the migration worker process. 2. `rabbit_delayed_message_leveled:setup/0`, running in the delayed-message gen_server. The hand-off relies on `setup/0` calling `catch leveled_bookie:book_close(?BOOKIE)` first. Two fragilities here: - If the migration worker crashes before the gen_server runs its `await_khepri_and_setup` cast, the bookie dies with it. Journal recovery will make data eventually correct, but the timing and idempotency across a supervisor-restarted gen_server are untested. Worth adding a test that SIGKILLs the migration mid-copy. - `catch book_close(?BOOKIE)` when `?BOOKIE` is `undefined` (fresh boot, no persistent_term set) is a noisy way to spell "skip close when none is open". Better: `case ?BOOKIE of undefined -> ok; Pid -> catch leveled_bookie:book_close(Pid) end`. ### `maybe_switch_to_leveled/1` blocks the gen_server indefinitely on a stuck feature flag ```erlang maybe_switch_to_leveled(State = #state{timer = CurrTimer}) -> case rabbit_feature_flags:is_enabled(khepri_db) of ``` `is_enabled/1` defaults to `blocking` mode, which in turn calls `rabbit_ff_registry_factory:acquire_state_change_lock()`. If the migration is stuck or the khepri_db FF transition never resolves, the delayed-message gen_server is unable to process `{delay_message, ...}` calls while the `handle_cast(await_khepri_and_setup, ...)` handler runs, because calls and casts share the mailbox. This is visible as all publishers seeing `gen_server:call` timeouts. There's no timeout or cancellation path. Using `non_blocking` and a short retry loop would preserve liveness. ### `copy_to_khepri/3` handles `pause` by sleeping in the single migration worker ```erlang case leveled_bookie:book_put(Bookie, ?BUCKET, Key, ..., []) of ok -> ok; pause -> int_migration_pause() %% timer:sleep(1000) end, ``` Per the `mnesia_to_khepri_converter` behaviour contract, "all callbacks are run in the same process." A 1-second sleep therefore blocks the entire table-copy pipeline. If Leveled returns `pause` frequently during a large migration (which is exactly when back-pressure exists), migration throughput collapses to ~1 record/s. A better response is to apply back-pressure to the Mnesia checkpoint reader (e.g., slow the bookkeeping side) or to treat `pause` as advisory: continue immediately but with jittered short waits. ### `get_many/1` issues one `book_get` per entry For a timer that fires on N due messages at the same timestamp, the code does N individual `leveled_bookie:book_get` calls, each a full `gen_server:call` through the bookie. For bursts this stalls the delayed-message gen_server (which is itself a `gen_server:call` back from `delay_message/3` publishers). An earlier commit used `book_objectfold` with a range of `<<DelayTS:64/big, 0:128>>` to `<<DelayTS:64/big, 0xFF*16>>` which is O(log N) bookie roundtrips. ### `clear_data_in_khepri/1` is dead code ```erlang clear_data_in_khepri(Table) -> ... ``` The `mnesia_to_khepri_converter` behaviour does not declare a `clear_data_in_khepri/1` callback. The RabbitMQ-level `rabbit_db_m2k_converter` intermediate (which this module does NOT use) sometimes calls the converter's own private `clear_data_in_khepri`, but the PR directly implements `mnesia_to_khepri_converter`. Remove the export and the function. ### Unresolved `TODO` comments in shipping code Three `TODO` comments in `rabbit_delayed_message.erl` mark unresolved questions (startup behaviour after the removal of `go/0`, `stats_state` refresh path, and an avoidable double `get_first_delay/0` call in `internal_delay_message/4`). Either resolve them or open tracking issues before merge. ### Minor: counter atomics leak on repeated `setup/0` `init_counters/0` unconditionally creates new atomics and does `persistent_term:put/2`, making any previously-stored atomic unreachable. Harmless in steady state, but every supervisor restart or plugin re-enable doubles the count of orphan atomics until the VM is restarted. Check-then-reuse would be cleaner. ### Minor: `disable_plugin/0` leaves persistent_term state ```erlang disable_plugin() -> catch ets:delete(?INDEX_TABLE), leveled_bookie:book_close(?BOOKIE). ``` Neither the bookie persistent_term entry nor the per-exchange counter entries are cleared. On re-enable without a VM restart, `counter_key(ExName)` still points to the old atomic (which, combined with a fresh `init_counters/0` overwrite, is benign), but the bookie term holds a now-dead pid. `setup/0`'s `catch book_close/1` will log an error for each re-enable cycle. Clear the persistent_term entries on disable. ## Migration correctness The migration design is sound apart from the items above. One thing worth calling out positively: ```erlang KeySuffix = crypto:hash(md5, term_to_binary({TS, Exchange, Ref})), Key = <<TS:64/big, KeySuffix/binary>>, ``` A deterministic Leveled key prevents duplicate inserts on a retried migration - good. The comment on `delete_from_khepri/3` openly says the converter ignores concurrent Mnesia deletions during the copy window and that this can cause at-most-one redundant delivery. That tradeoff is defensible given the alternatives (two-phase synchronisation with an always-progressing delivery timer is hard), but it deserves a test or at least an item in the plugin documentation. ## Test-suite gaps - **Duplicate `init_per_group(mnesia_to_khepri, Config)` clause.** The second clause is unreachable (Erlang takes the first); compiler will warn. The second clause also differs from the first (it lacks `{tcp_ports_base, 21200}`), suggesting an incomplete rebase. Remove one, and decide intentionally which settings to keep. - **`mnesia_to_khepri_migration/1` is shallow.** It publishes 3 messages with a 30-second delay, flips `khepri_db`, and consumes. It does not: - assert `messages_delayed` drops to zero after consumption (would expose bug #2); - publish additional messages DURING the migration (would exercise the `delete_from_khepri/3` at-most-once acknowledgement); - restart the node between migration and consumption (would exercise the converter-opened bookie then `setup/0` takeover); - verify that a second, post-migration publish works correctly (would exercise `get_first_delay` on the new backend). - **No leveled-equivalent of `no_message_for_index`.** The removed test exercised an index-vs-store divergence recovery path. The leveled backend has a symmetric failure mode (ETS populated by `init_index/0`, but `book_get` returns `not_found` for a key that was corrupted or deleted externally) and the `get_many` empty-deliveries branch is its recovery path. A test that forcibly `book_delete`s one object between publish and consume would exercise this and would probably fail today because of bug #3. - **benchmark_SUITE adds value** and deliberately excludes itself from `make ct` by setting `CT_SUITES = plugin`. That's the right call. It still imports `rabbit_delayed_message_mnesia:table_name/0` and `index_table_name/0`; keep those exports. ## Build system ### `patch-leveled-app` is non-portable ```makefile patch-leveled-app: $(verbose) if [ -f $(DEPS_DIR)/leveled/ebin/leveled.app ]; then \ sed -i 's/,zstd//' $(DEPS_DIR)/leveled/ebin/leveled.app; \ sed -i 's/,lz4//' $(DEPS_DIR)/leveled/ebin/leveled.app; \ fi ``` GNU `sed -i` takes the in-place flag alone; BSD/macOS `sed` requires `-i ''`. This target will fail on macOS CI and on dev machines. Use `sed -i.bak 's/,zstd//' ... && rm -f $(...).bak`, or `perl -i -pe ...`, or better: edit via `erl -noshell -eval` that reads/writes the `.app` term. ### Rationale is sound but deserves clearer sign-posting OTP 28's stdlib `zstd` module clashes with OpenRiak's NIF; RabbitMQ 4.2.0 ships an incompatible `lz4`. Leveled's only runtime users of those modules (`leveled_codec:serialise_object/3` and `deserialise_object/4`) are never reached when `{compression_method, none}` is passed to `book_start/1`, which the PR does. So stripping them from `leveled.app` prevents OTP from trying to start apps that don't exist, and is safe. The reason is non-obvious. A comment above the `DEPS` line explains part of this; the Makefile target's comment should also note that **the NIF libraries are never loaded in this build** because `{compression_method, none}` short-circuits them in `leveled_codec`. A builder who changes `setup/0` to use `lz4` later will find the system starts failing and it won't be obvious why. ## Positives worth preserving - The dispatch pattern in `rabbit_delayed_message.erl` (calls through `rabbit_khepri:handle_fallback/1` with mnesia and khepri closures) is faithful to the existing recent-history / consistent-hash / JMS converter precedent. - Moving node-specific `?TABLE_NAME`/`?INDEX_TABLE_NAME` expansion out of the `rabbit_mnesia_tables_to_khepri_db` attribute and behind an MFA pointer to `rabbit_delayed_message_mnesia:table_names/0` is exactly the documented way to handle dynamic table names. The comment on that function explaining "must not assume any Rabbit sub-system is running" is on-point. - `<<TS:64/big, Random:16/binary>>` as the Leveled key is a good choice: sort order equals delivery order, cross-exchange, and the 16-byte random suffix gives plenty of collision resistance for same-millisecond inserts. - The deterministic migration key design is clever. - The benchmark + flamegraph scaffolding in `benchmark_SUITE.erl` and the Makefile targets is well-thought-out and should help future tuning. ## What I would ask for before merge 1. Fix bugs #1, #2, #3. These are not style issues. 2. Remove the duplicate `init_per_group(mnesia_to_khepri, Config)` clause and reconcile its config. 3. Remove `clear_data_in_khepri/1` or hook it up to something real. 4. Replace the `sed -i` hack with a portable equivalent. 5. Remove the `TODO` comments by either resolving the concerns or filing tracked issues. 6. Move the Leveled data path out from under `rabbit_khepri:dir()`. 7. Add at least two migration tests: one that publishes during the migration, and one that restarts the node during consumption. 8. Add a non-blocking or timed wrapper around `rabbit_feature_flags:is_enabled(khepri_db)` in `maybe_switch_to_leveled/1`. Happy to review a second round after these are addressed. -
lukebakken created this gist
Apr 27, 2026 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1 @@ TODO