Last active
June 5, 2018 20:27
-
-
Save pmihalcin/a93faa5a4c49434607015f909ab5f0e8 to your computer and use it in GitHub Desktop.
Requeue dead letter messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // first example | |
| @Bean | |
| public IntegrationFlow flow(ConnectionFactory cf) { | |
| return IntegrationFlows.from(Amqp.inboundAdapter(cf, "q").autoStartup(false)) | |
| .publishSubscribeChannel(s -> s | |
| .subscribe(f -> f.transform("headers['x-death'] ?: '{}'") | |
| .handle("deathCheckBean", "deathCheckMethod")) | |
| .subscribe(f -> f.handle(System.out::println))) // main flow | |
| .get(); | |
| } | |
| // more type-safe way | |
| @Bean | |
| public IntegrationFlow flow(ConnectionFactory cf) { | |
| ObjectMapper mapper = new ObjectMapper(); | |
| JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, XDeath.class); | |
| mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | |
| return IntegrationFlows.from(Amqp.inboundAdapter(cf, "q").autoStartup(false)) | |
| .publishSubscribeChannel(s -> s | |
| .subscribe(f -> f.transform("headers['x-death'] ?: '[]'") | |
| .transform(Transformers.toJson()) | |
| .<String, List<XDeath>>transform(p -> { | |
| try { | |
| return mapper.readValue(p, type); | |
| } | |
| catch (IOException e) { | |
| e.printStackTrace(); | |
| throw new RuntimeException(e); | |
| } | |
| }) | |
| .handle(System.out::println)) | |
| .subscribe(f -> f.handle(System.out::println))) // main flow | |
| .get(); | |
| } | |
| @Getter | |
| @ToString | |
| @EqualsAndHashCode | |
| static class XDeath { | |
| private final String reason; | |
| private final int count; | |
| @JsonCreator | |
| public XDeath(String reason, int count) { | |
| this.reason = reason; | |
| this.count = count; | |
| } | |
| } | |
| // when we want to publish message to parking lot queue | |
| @Component | |
| class DeathCheck { | |
| private final ObjectMapper mapper; | |
| private final CollectionType type; | |
| public DeathCheck(ObjectMapper mapper) { | |
| this.mapper = new ObjectMapper(); | |
| this.type = this.mapper.getTypeFactory().constructCollectionType(List.class, XDeath.class); | |
| this.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | |
| } | |
| @ServiceActivator | |
| public void handle(String payload, @Header("x-death") List<?> deathHeader) throws Exception { | |
| List<XDeath> xDeath = mapper.readValue(this.mapper.writeValueAsBytes(deathHeader), this.type); | |
| System.out.println(payload + ": " + xDeath); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Bean | |
| public Queue q() { | |
| Map<String, Object> args = new HashMap<>(); | |
| args.put("x-dead-letter-exchange", ""); | |
| args.put("x-dead-letter-routing-key", "qq"); | |
| return new Queue("q", false, false, true, args); | |
| } | |
| @Bean | |
| public Queue qq() { | |
| Map<String, Object> args = new HashMap<>(); | |
| args.put("x-message-ttl", 5000); | |
| args.put("x-dead-letter-exchange", ""); | |
| args.put("x-dead-letter-routing-key", "q"); | |
| return new Queue("qq", false, false, true, args); | |
| } | |
| @RabbitListener(queues = "q") | |
| public void listen(Message in) { | |
| System.out.println(in); | |
| throw new AmqpRejectAndDontRequeueException("foo"); | |
| } | |
| // This example uses the default exchange "" as the DLX. | |
| // https://stackoverflow.com/questions/31248441/requeueing-dead-letter-messages | |
| // https://stackoverflow.com/questions/27640358/spring-amqp-with-rabbitmq-message-is-not-circled-back-to-live-queue-after-falli/27641433#27641433 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment