Skip to content

Instantly share code, notes, and snippets.

@pmihalcin
Last active June 5, 2018 20:27
Show Gist options
  • Select an option

  • Save pmihalcin/a93faa5a4c49434607015f909ab5f0e8 to your computer and use it in GitHub Desktop.

Select an option

Save pmihalcin/a93faa5a4c49434607015f909ab5f0e8 to your computer and use it in GitHub Desktop.
Requeue dead letter messages
// first example
@Bean
public IntegrationFlow flow(ConnectionFactory cf) {
return IntegrationFlows.from(Amqp.inboundAdapter(cf, "q").autoStartup(false))
.publishSubscribeChannel(s -> s
.subscribe(f -> f.transform("headers['x-death'] ?: '{}'")
.handle("deathCheckBean", "deathCheckMethod"))
.subscribe(f -> f.handle(System.out::println))) // main flow
.get();
}
// more type-safe way
@Bean
public IntegrationFlow flow(ConnectionFactory cf) {
ObjectMapper mapper = new ObjectMapper();
JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, XDeath.class);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return IntegrationFlows.from(Amqp.inboundAdapter(cf, "q").autoStartup(false))
.publishSubscribeChannel(s -> s
.subscribe(f -> f.transform("headers['x-death'] ?: '[]'")
.transform(Transformers.toJson())
.<String, List<XDeath>>transform(p -> {
try {
return mapper.readValue(p, type);
}
catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
})
.handle(System.out::println))
.subscribe(f -> f.handle(System.out::println))) // main flow
.get();
}
@Getter
@ToString
@EqualsAndHashCode
static class XDeath {
private final String reason;
private final int count;
@JsonCreator
public XDeath(String reason, int count) {
this.reason = reason;
this.count = count;
}
}
// when we want to publish message to parking lot queue
@Component
class DeathCheck {
private final ObjectMapper mapper;
private final CollectionType type;
public DeathCheck(ObjectMapper mapper) {
this.mapper = new ObjectMapper();
this.type = this.mapper.getTypeFactory().constructCollectionType(List.class, XDeath.class);
this.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@ServiceActivator
public void handle(String payload, @Header("x-death") List<?> deathHeader) throws Exception {
List<XDeath> xDeath = mapper.readValue(this.mapper.writeValueAsBytes(deathHeader), this.type);
System.out.println(payload + ": " + xDeath);
}
}
@Bean
public Queue q() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "");
args.put("x-dead-letter-routing-key", "qq");
return new Queue("q", false, false, true, args);
}
@Bean
public Queue qq() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
args.put("x-dead-letter-exchange", "");
args.put("x-dead-letter-routing-key", "q");
return new Queue("qq", false, false, true, args);
}
@RabbitListener(queues = "q")
public void listen(Message in) {
System.out.println(in);
throw new AmqpRejectAndDontRequeueException("foo");
}
// This example uses the default exchange "" as the DLX.
// https://stackoverflow.com/questions/31248441/requeueing-dead-letter-messages
// https://stackoverflow.com/questions/27640358/spring-amqp-with-rabbitmq-message-is-not-circled-back-to-live-queue-after-falli/27641433#27641433
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment