Skip to content

Instantly share code, notes, and snippets.

@pmihalcin
Created January 3, 2018 16:58
Show Gist options
  • Select an option

  • Save pmihalcin/e1585282ba2f0ec8808c73f35e61b6cf to your computer and use it in GitHub Desktop.

Select an option

Save pmihalcin/e1585282ba2f0ec8808c73f35e61b6cf to your computer and use it in GitHub Desktop.
DB commit JMS rollback @SpringBootTest
@RunWith(SpringRunner.class)
@SpringBootTest
public class DbCommitJmsRollback implements ApplicationContextAware {
private Lifecycle lifecycle;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Jaxb2Marshaller marshaller;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.lifecycle = (Lifecycle) applicationContext;
}
@Before
public void clearData() {
getMessages(UPSTREAM_EVENT_QUEUE); // drain queue
}
private List<String> getMessages(String queueName) {
long receiveTimeout = jmsTemplate.getReceiveTimeout();
jmsTemplate.setReceiveTimeout(3000);
String next = "";
List<String> msgs = new ArrayList<>();
while (next != null) {
next = (String) jmsTemplate.receiveAndConvert(queueName);
if (next != null) {
System.out.println("Received JMS message: " + next);
msgs.add(next);
}
}
jmsTemplate.setReceiveTimeout(receiveTimeout);
return msgs;
}
@Test
public void testBusinessFailure() {
TransactionEnvelopeDto envelope = createTransactionEnvelope(
"9698452968",
"440565",
"27206206",
"008 AZS",
"KZT",
"KZ",
"5541");
StringResult result = new StringResult();
marshaller.marshal(envelope, result);
jmsTemplate.convertAndSend(UPSTREAM_EVENT_QUEUE, result.toString());
// Start the listeners...
lifecycle.start();
}
@After
public void waitForMessages() throws Exception {
// Give it time to consume...
System.out.println("Waiting 10s to consume messages...");
Thread.sleep(10000);
// Stop the listeners...
lifecycle.stop();
// Give it time to finish up...
System.out.println("Waiting 5s to stop listeners...");
Thread.sleep(5000);
List<String> list = getMessages("ActiveMQ.DLQ");
// One message rolled back and was moved to dead letter queue
assertEquals(1, list.size());
}
}
@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class FailureSimulator {
private final JmsTemplate jmsTemplate;
@AfterReturning("execution(* *..*EvaluationHandler+.handle(..))")
public void fail() {
simulateMessageSystemFailure();
}
/**
* Causes the JMS session to fail on commit, as if the middleware has
* failed. Can be used to simulate failure of JMS independent of business
* processing, causing duplicate messages even if best efforts 1PC is used.
*/
private void simulateMessageSystemFailure() {
// Simulate a message system failure before the main transaction
// commits...
jmsTemplate.execute((SessionCallback<Session>) session -> {
log.debug("Intentional JMS session rollback!");
session.rollback();
return null;
});
}
/**
* Just throws a {@link DataIntegrityViolationException}.
*/
private void simulateBusinessProcessingFailure() {
throw new DataIntegrityViolationException("Simulated failure.");
}
}
@Bean
public IntegrationFlow upstreamEventFlow() {
return IntegrationFlows.from(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.configureListenerContainer(container -> container
.destinationResolver(destinationResolver)
.concurrentConsumers(2)
.errorHandler(t -> {
log.warn("Handling error in error handler...");
if (t instanceof MessageHandlingException) {
MessageHandlingException exception = (MessageHandlingException) t;
Message<?> failedMsg = exception.getFailedMessage();
log.warn("original headers: {}", failedMsg.getHeaders());
log.warn("original payload: {}", failedMsg.getPayload());
log.warn("exception msg: {}", exception.getMessage());
}
})
)
.autoStartup(false)
.destination(UPSTREAM_EVENT_QUEUE)
.get()
)
.wireTap(incomingMessageLoggingChannel)
.transform(xmlToObjectTransformer)
.transform(upstreamTransformer)
.handle(handler)
.transform(objectToXmlTransformer)
.channel(downstreamEventChannel)
.get();
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml"/>
<jmxConfigurator/>
<logger name="org.springframework.transaction" level="trace"/>
<logger name="org.springframework.jms.connection.JmsTransactionManager" level="debug"/>
<logger name="org.springframework.orm.jpa.JpaTransactionManager" level="debug"/>
<logger name="org.springframework.orm.hibernate5.HibernateTransactionManager" level="debug"/>
<logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="debug"/>
<logger name="org.springframework.integration" level="debug"/>
<logger name="org.springframework.messaging" level="debug"/>
<logger name="org.springframework.jms" level="debug"/>
<!--<logger name="org.springframework.aop" level="debug"/>-->
</configuration>
@pmihalcin
Copy link
Copy Markdown
Author

pmihalcin commented Jan 9, 2018

Some additional information for the implementation:

Artem: You can wrap service method with the JDBC TX to some Advice in the test-case to throw an exception afterwards

Patrik: Won't this be a problematic since test and listener container run in different threads?

Artem: Correct, but you may have the container stopped before running test. Modify the service with AOP Advice and only after that start the container to poll messages from the JMS destination

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment