Created
January 3, 2018 16:58
-
-
Save pmihalcin/e1585282ba2f0ec8808c73f35e61b6cf to your computer and use it in GitHub Desktop.
DB commit JMS rollback @SpringBootTest
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @RunWith(SpringRunner.class) | |
| @SpringBootTest | |
| public class DbCommitJmsRollback implements ApplicationContextAware { | |
| private Lifecycle lifecycle; | |
| @Autowired | |
| private JmsTemplate jmsTemplate; | |
| @Autowired | |
| private Jaxb2Marshaller marshaller; | |
| @Override | |
| public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { | |
| this.lifecycle = (Lifecycle) applicationContext; | |
| } | |
| @Before | |
| public void clearData() { | |
| getMessages(UPSTREAM_EVENT_QUEUE); // drain queue | |
| } | |
| private List<String> getMessages(String queueName) { | |
| long receiveTimeout = jmsTemplate.getReceiveTimeout(); | |
| jmsTemplate.setReceiveTimeout(3000); | |
| String next = ""; | |
| List<String> msgs = new ArrayList<>(); | |
| while (next != null) { | |
| next = (String) jmsTemplate.receiveAndConvert(queueName); | |
| if (next != null) { | |
| System.out.println("Received JMS message: " + next); | |
| msgs.add(next); | |
| } | |
| } | |
| jmsTemplate.setReceiveTimeout(receiveTimeout); | |
| return msgs; | |
| } | |
| @Test | |
| public void testBusinessFailure() { | |
| TransactionEnvelopeDto envelope = createTransactionEnvelope( | |
| "9698452968", | |
| "440565", | |
| "27206206", | |
| "008 AZS", | |
| "KZT", | |
| "KZ", | |
| "5541"); | |
| StringResult result = new StringResult(); | |
| marshaller.marshal(envelope, result); | |
| jmsTemplate.convertAndSend(UPSTREAM_EVENT_QUEUE, result.toString()); | |
| // Start the listeners... | |
| lifecycle.start(); | |
| } | |
| @After | |
| public void waitForMessages() throws Exception { | |
| // Give it time to consume... | |
| System.out.println("Waiting 10s to consume messages..."); | |
| Thread.sleep(10000); | |
| // Stop the listeners... | |
| lifecycle.stop(); | |
| // Give it time to finish up... | |
| System.out.println("Waiting 5s to stop listeners..."); | |
| Thread.sleep(5000); | |
| List<String> list = getMessages("ActiveMQ.DLQ"); | |
| // One message rolled back and was moved to dead letter queue | |
| assertEquals(1, list.size()); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Aspect | |
| @Component | |
| @RequiredArgsConstructor | |
| @Slf4j | |
| public class FailureSimulator { | |
| private final JmsTemplate jmsTemplate; | |
| @AfterReturning("execution(* *..*EvaluationHandler+.handle(..))") | |
| public void fail() { | |
| simulateMessageSystemFailure(); | |
| } | |
| /** | |
| * Causes the JMS session to fail on commit, as if the middleware has | |
| * failed. Can be used to simulate failure of JMS independent of business | |
| * processing, causing duplicate messages even if best efforts 1PC is used. | |
| */ | |
| private void simulateMessageSystemFailure() { | |
| // Simulate a message system failure before the main transaction | |
| // commits... | |
| jmsTemplate.execute((SessionCallback<Session>) session -> { | |
| log.debug("Intentional JMS session rollback!"); | |
| session.rollback(); | |
| return null; | |
| }); | |
| } | |
| /** | |
| * Just throws a {@link DataIntegrityViolationException}. | |
| */ | |
| private void simulateBusinessProcessingFailure() { | |
| throw new DataIntegrityViolationException("Simulated failure."); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Bean | |
| public IntegrationFlow upstreamEventFlow() { | |
| return IntegrationFlows.from( | |
| Jms.messageDrivenChannelAdapter(jmsConnectionFactory) | |
| .configureListenerContainer(container -> container | |
| .destinationResolver(destinationResolver) | |
| .concurrentConsumers(2) | |
| .errorHandler(t -> { | |
| log.warn("Handling error in error handler..."); | |
| if (t instanceof MessageHandlingException) { | |
| MessageHandlingException exception = (MessageHandlingException) t; | |
| Message<?> failedMsg = exception.getFailedMessage(); | |
| log.warn("original headers: {}", failedMsg.getHeaders()); | |
| log.warn("original payload: {}", failedMsg.getPayload()); | |
| log.warn("exception msg: {}", exception.getMessage()); | |
| } | |
| }) | |
| ) | |
| .autoStartup(false) | |
| .destination(UPSTREAM_EVENT_QUEUE) | |
| .get() | |
| ) | |
| .wireTap(incomingMessageLoggingChannel) | |
| .transform(xmlToObjectTransformer) | |
| .transform(upstreamTransformer) | |
| .handle(handler) | |
| .transform(objectToXmlTransformer) | |
| .channel(downstreamEventChannel) | |
| .get(); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?xml version="1.0" encoding="UTF-8"?> | |
| <configuration> | |
| <include resource="org/springframework/boot/logging/logback/base.xml"/> | |
| <jmxConfigurator/> | |
| <logger name="org.springframework.transaction" level="trace"/> | |
| <logger name="org.springframework.jms.connection.JmsTransactionManager" level="debug"/> | |
| <logger name="org.springframework.orm.jpa.JpaTransactionManager" level="debug"/> | |
| <logger name="org.springframework.orm.hibernate5.HibernateTransactionManager" level="debug"/> | |
| <logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="debug"/> | |
| <logger name="org.springframework.integration" level="debug"/> | |
| <logger name="org.springframework.messaging" level="debug"/> | |
| <logger name="org.springframework.jms" level="debug"/> | |
| <!--<logger name="org.springframework.aop" level="debug"/>--> | |
| </configuration> |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Some additional information for the implementation:
Artem: You can wrap service method with the JDBC TX to some Advice in the test-case to throw an exception afterwards
Patrik: Won't this be a problematic since test and listener container run in different threads?
Artem: Correct, but you may have the container stopped before running test. Modify the service with AOP Advice and only after that start the container to poll messages from the JMS destination