Server setup ==== - three nodes cluser version 4.1.0 - load balancer - I am using [this one](https://github.com/rabbitmq/rabbitmq-stream-go-client/tree/main/compose) Clients setup === - Three Java, Go, .Net - 1 producer 1 consumer - [.NET client](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient) - [Go Client](https://github.com/rabbitmq/rabbitmq-stream-go-client/tree/main/examples/reliable) - Java Client, following code: ```java Address entryPoint = new Address("localhost", 5553); Environment environment = Environment.builder().host("localhost").port(5553).addressResolver(address -> entryPoint).build(); AtomicLong consumed = new AtomicLong(); AtomicLong published = new AtomicLong(); AtomicLong publishedNotConfirmed = new AtomicLong(); Thread consumerThread = new Thread(() -> { try { while (true) { Thread.sleep(2000); System.out.println(Instant.now() + " [Java] Received: " + customFormat("###,###.###", consumed.get()) + " messages"); System.out.println(Instant.now() + " [Java] Published: " + customFormat("###,###.###", published.get()) + " messages"); System.out.println(Instant.now() + " [Java] Published Not Confirmed: " + customFormat("###,###.###", publishedNotConfirmed.get()) + " messages"); System.out.println("Total Published: " + customFormat("###,###.###", published.get() + publishedNotConfirmed.get()) + " messages"); } } catch (Exception e) { e.printStackTrace(); } }); consumerThread.start(); environment.consumerBuilder().stream("reliable-Test-0").offset(OffsetSpecification.first()).messageHandler((context, message) -> { consumed.incrementAndGet(); }).build(); Producer producer = environment.producerBuilder().stream("reliable-Test-0").build(); System.out.println("Starting publishing..."); int messageCount = 50_000_000; for (int i = 0; i < messageCount; i++) { try { producer.send(producer.messageBuilder().addData(String.valueOf(i).getBytes()).build(), confirmationStatus -> { if (confirmationStatus.isConfirmed()) { published.incrementAndGet(); } else { publishedNotConfirmed.incrementAndGet(); } }); } catch (Exception e) { publishedNotConfirmed.incrementAndGet(); } } try { Thread.sleep(15_000); } catch (InterruptedException e) { e.printStackTrace(); } producer.close(); try { System.in.read(); } catch (Exception e) { e.printStackTrace(); } } } ``` - The producers have to send: - Java: 50_000_000 - Net: 50_000_000 - golang 100_000_000 ### Caos during the tests this script kills all the docker images in a sequential way: ``` declare -a arr=$(docker ps -q) for i in {1..500} do for image in $(docker ps -q); do echo Stopping ..$image docker restart $image echo Waiting 20s ..$image sleep 20 echo Waiting 30s ..$image sleep 30 done done ``` ### Results Messages in RabbitMQ: Screenshot 2025-04-18 at 10 10 41 ### Golang: - Faild + Confirmed = 100,000,000 ✅ - Consumed = 175,563,479 ✅ Screenshot 2025-04-18 at 10 23 51 ### DotNet: - Faild + Confirmed = 50,000,000 ✅ - Consumed = 175,563,479 ✅ Screenshot 2025-04-18 at 10 28 39 ### Java: - Faild + Confirmed = 50,000,000 ✅ - Consumed = 175,563,498 ✅ ( by default java re-consume the last message so there could be some duplication. But it is ok) for more complex subscriptions https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#consumer-subscription-listener