Last active
October 28, 2023 10:16
-
-
Save mhewedy/68a26c85764200f4f43a4939a9f95b5a to your computer and use it in GitHub Desktop.
Revisions
-
mhewedy revised this gist
Oct 28, 2023 . 1 changed file with 219 additions and 54 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,81 +1,246 @@ import brave.spring.rabbit.SpringRabbitTracing; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.MultiRabbitListenerAnnotationBeanPostProcessor; import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.amqp.*; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ResourceLoader; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * Configurer to configure multi rabbit vhosts. it disabled auto-configurations and do configurations manually * based on {@link MultiRabbitProps}. * <p> * this configuration has nothing to do with <a href="https://github.com/freenowtech/spring-multirabbit">freenowtech/spring-multirabbit</a> * in fact, we migrated from this library as it is no longer supported. * <p> * Usage: * <pre> * # in application.yaml * spring: * multirabbitmq: * enabled: true * connections: * first: * host: localhost * second: * host: localhost * virtualHost: myvhost * third: * host: anotherHost * </pre> * * @see <a href="https://docs.spring.io/spring-amqp/reference/html/#multi-rabbit">spring rabbit docs</a> */ @Slf4j @Configuration @RequiredArgsConstructor @ConditionalOnClass(RabbitTemplate.class) @EnableAutoConfiguration(exclude = RabbitAutoConfiguration.class) @EnableConfigurationProperties(MultiRabbitConfig.MultiRabbitProps.class) @ConditionalOnProperty(name = "spring.multirabbitmq.enabled", havingValue = "true") public class MultiRabbitConfig { private final MultiRabbitProps props; private final ResourceLoader resourceLoader; private final SpringRabbitTracing springRabbitTracing; private final ConfigurableBeanFactory configurableBeanFactory; private final ObjectProvider<MessageConverter> messageConverter; @Data @ConfigurationProperties(prefix = "spring.multirabbitmq") public static class MultiRabbitProps { public boolean enabled; public Map<String, RabbitProperties> connections = new HashMap<>(); } @Bean public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() { return new RabbitListenerEndpointRegistry(); } @Bean public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) { MultiRabbitListenerAnnotationBeanPostProcessor postProcessor = new MultiRabbitListenerAnnotationBeanPostProcessor(); postProcessor.setEndpointRegistry(registry); postProcessor.setContainerFactoryBeanName("defaultContainerFactory"); return postProcessor; } @PostConstruct public void registerBeans() { final List<ContainerHolder> containerHolders = new ArrayList<>(); for (Map.Entry<String, RabbitProperties> entry : this.props.connections.entrySet()) { String key = entry.getKey(); RabbitProperties rabbitProperties = entry.getValue(); log.debug("configuring rabbitmq => key: {} with host: {}, port: {}, vhost: {} username: {}", key, rabbitProperties.determineHost(), rabbitProperties.determinePort(), rabbitProperties.determineVirtualHost(), rabbitProperties.determineUsername()); var connectionFactory = buildConnectionFactory(rabbitProperties); this.configurableBeanFactory.registerSingleton(key + "-connectionFactory", connectionFactory); RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); this.configurableBeanFactory.registerSingleton(key + "-admin", rabbitAdmin); var containerFactory = buildSimpleListenerContainerFactory(connectionFactory); this.configurableBeanFactory.registerSingleton(key, containerFactory); containerHolders.add(new ContainerHolder(key, rabbitProperties, connectionFactory, containerFactory)); } var rcf = buildRoutingConnectionFactory(containerHolders); this.configurableBeanFactory.registerSingleton("rcf", rcf); var rabbitTemplate = new RabbitTemplate(rcf); springRabbitTracing.decorateRabbitTemplate(rabbitTemplate); this.messageConverter.ifUnique(rabbitTemplate::setMessageConverter); this.configurableBeanFactory.registerSingleton("rabbitTemplate", rabbitTemplate); configureContainerFactory(rabbitTemplate, containerHolders); } private CachingConnectionFactory buildConnectionFactory(RabbitProperties rabbitProperties) { try { // see RabbitAutoConfiguration.java var rabbitConnectionFactoryBeanConfigurer = new RabbitConnectionFactoryBeanConfigurer(resourceLoader, rabbitProperties); var connectionFactoryBean = new RabbitConnectionFactoryBean(); rabbitConnectionFactoryBeanConfigurer.configure(connectionFactoryBean); connectionFactoryBean.afterPropertiesSet(); com.rabbitmq.client.ConnectionFactory connectionFactory = connectionFactoryBean.getObject(); if (connectionFactory == null) { throw new RuntimeException("connectionFactory is null"); } var factory = new CachingConnectionFactory(connectionFactory); CachingConnectionFactoryConfigurer configurer = new CachingConnectionFactoryConfigurer(rabbitProperties); configurer.configure(factory); return factory; } catch (Exception e) { throw new RuntimeException(e); } } private SimpleRoutingConnectionFactory buildRoutingConnectionFactory( List<ContainerHolder> cachingConnectionFactories) { Map<Object, ConnectionFactory> map = cachingConnectionFactories.stream() .collect(Collectors.toMap(it -> it.key, it -> it.connectionFactory)); SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory(); rcf.setDefaultTargetConnectionFactory(cachingConnectionFactories.get(0).connectionFactory); rcf.setTargetConnectionFactories(map); return rcf; } private SimpleRabbitListenerContainerFactory buildSimpleListenerContainerFactory( CachingConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setConnectionFactory(connectionFactory); return containerFactory; } private void configureContainerFactory( RabbitTemplate rabbitTemplate, List<ContainerHolder> containerHolders) { for (ContainerHolder h : containerHolders) { var configurer = new MultiRabbitConfigurer(h.rabbitProperties, h.connectionFactory); messageConverter.ifUnique(configurer::setMessageConverter); configurer.setMessageRecoverer( new MultiRabbitRepublishMessageRecoverer(rabbitTemplate, "general_dl_exchange", h.key)); configurer.configure(h.containerFactory, h.connectionFactory); springRabbitTracing.decorateSimpleRabbitListenerContainerFactory(h.containerFactory); } } private record ContainerHolder( String key, RabbitProperties rabbitProperties, ConnectionFactory connectionFactory, SimpleRabbitListenerContainerFactory containerFactory) { } @RequiredArgsConstructor private static class MultiRabbitConfigurer extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> { private final RabbitProperties properties; private final ConnectionFactory connectionFactory; @Override public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory ignored) { RabbitProperties.SimpleContainer config = properties.getListener().getSimple(); PropertyMapper map = PropertyMapper.get(); configure(factory, connectionFactory, config); map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers); map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers); map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize); map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled); } @Override public void setMessageConverter(MessageConverter messageConverter) { super.setMessageConverter(messageConverter); } @Override public void setMessageRecoverer(MessageRecoverer messageRecoverer) { super.setMessageRecoverer(messageRecoverer); } } private static class MultiRabbitRepublishMessageRecoverer extends RepublishMessageRecoverer { private final String key; public MultiRabbitRepublishMessageRecoverer(RabbitTemplate rabbitTemplate, String errorExchange, String key) { super(rabbitTemplate, errorExchange); this.key = key; } @Override protected void doSend(String exchange, String routingKey, Message message) { SimpleResourceHolder.bind(((RabbitTemplate) errorTemplate).getConnectionFactory(), key); // routing key is: "error.<original queue name>" super.doSend(exchange, routingKey + message.getMessageProperties().getConsumerQueue(), message); SimpleResourceHolder.unbind(((RabbitTemplate) errorTemplate).getConnectionFactory()); } } } -
mhewedy created this gist
Nov 7, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,81 @@ import com.rabbitmq.client.Connection; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.camel.CamelContext; import org.apache.camel.component.rabbitmq.RabbitMQEndpoint; import org.apache.camel.component.rabbitmq.springboot.RabbitMQComponentConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.EventListener; import org.springframework.core.io.ClassPathResource; import org.springframework.http.*; import org.springframework.util.FileCopyUtils; import org.springframework.web.client.RestTemplate; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeoutException; @Slf4j @Configuration @RequiredArgsConstructor @ConditionalOnProperty(name = "app.startup.rabbitmq.load-definitions.enabled") public class RabbitMQConfig { private static final String RABBITMQ_DEFINITIONS = "rabbitmq.definitions.json"; private final CamelContext camelContext; private final RestTemplate restTemplate = new RestTemplate(); private final RabbitMQComponentConfiguration rabbitMQComponentConfiguration; @EventListener public void loadDefinitions(ContextRefreshedEvent event) throws Exception { log.debug("start loading rabbitmq definition"); String rabbitMQHost = getRabbitMQHost(); loadDefinitions(rabbitMQHost); } /** * We needed to connect to rabbitmq to get the resolved host name, * because before the connection is established, the host name is calculated * based on the camel properties provided per environment. */ private String getRabbitMQHost() throws IOException, TimeoutException { var rabbitMqEndPointKey = camelContext.getEndpointRegistry().keySet().stream() .filter(it -> it.get().startsWith("rabbitmq:")) .findFirst().orElseThrow(); RabbitMQEndpoint endpoint = (RabbitMQEndpoint) camelContext.getEndpointRegistry().get(rabbitMqEndPointKey); try (Connection tmpConnection = endpoint.connect(ForkJoinPool.commonPool())) { return tmpConnection.getAddress().getHostAddress(); } } private void loadDefinitions(String rabbitMQHost) throws IOException { var is = new ClassPathResource(RABBITMQ_DEFINITIONS).getInputStream(); String rabbitDefinitions = FileCopyUtils.copyToString(new InputStreamReader(is)); HttpEntity<String> request = new HttpEntity<>(rabbitDefinitions, getHeaders()); ResponseEntity<String> response = restTemplate.exchange( "http://" + rabbitMQHost + ":15672/api/definitions", HttpMethod.POST, request, String.class); if (response.getStatusCode().is2xxSuccessful()) { log.info(RABBITMQ_DEFINITIONS + " loaded successfully"); } else { throw new RuntimeException("cannot load rabbitmq definitions, " + response); } } private HttpHeaders getHeaders() { var headers = new HttpHeaders(); headers.setBasicAuth(rabbitMQComponentConfiguration.getUsername(), rabbitMQComponentConfiguration.getPassword()); headers.setContentType(MediaType.APPLICATION_JSON); return headers; } }