Skip to content

Instantly share code, notes, and snippets.

@mhewedy
Last active October 28, 2023 10:16
Show Gist options
  • Select an option

  • Save mhewedy/68a26c85764200f4f43a4939a9f95b5a to your computer and use it in GitHub Desktop.

Select an option

Save mhewedy/68a26c85764200f4f43a4939a9f95b5a to your computer and use it in GitHub Desktop.

Revisions

  1. mhewedy revised this gist Oct 28, 2023. 1 changed file with 219 additions and 54 deletions.
    273 changes: 219 additions & 54 deletions RabbitMQConfig.java
    Original file line number Diff line number Diff line change
    @@ -1,81 +1,246 @@
    import com.rabbitmq.client.Connection;

    import brave.spring.rabbit.SpringRabbitTracing;
    import lombok.Data;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.camel.CamelContext;
    import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
    import org.apache.camel.component.rabbitmq.springboot.RabbitMQComponentConfiguration;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.MultiRabbitListenerAnnotationBeanPostProcessor;
    import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.*;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.ObjectProvider;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.boot.autoconfigure.amqp.*;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.boot.context.properties.PropertyMapper;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.context.event.EventListener;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.http.*;
    import org.springframework.util.FileCopyUtils;
    import org.springframework.web.client.RestTemplate;

    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeoutException;
    import org.springframework.core.io.ResourceLoader;

    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;

    /**
    * Configurer to configure multi rabbit vhosts. it disabled auto-configurations and do configurations manually
    * based on {@link MultiRabbitProps}.
    * <p>
    * this configuration has nothing to do with <a href="https://github.com/freenowtech/spring-multirabbit">freenowtech/spring-multirabbit</a>
    * in fact, we migrated from this library as it is no longer supported.
    * <p>
    * Usage:
    * <pre>
    * # in application.yaml
    * spring:
    * multirabbitmq:
    * enabled: true
    * connections:
    * first:
    * host: localhost
    * second:
    * host: localhost
    * virtualHost: myvhost
    * third:
    * host: anotherHost
    * </pre>
    *
    * @see <a href="https://docs.spring.io/spring-amqp/reference/html/#multi-rabbit">spring rabbit docs</a>
    */
    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    @ConditionalOnProperty(name = "app.startup.rabbitmq.load-definitions.enabled")
    public class RabbitMQConfig {
    @ConditionalOnClass(RabbitTemplate.class)
    @EnableAutoConfiguration(exclude = RabbitAutoConfiguration.class)
    @EnableConfigurationProperties(MultiRabbitConfig.MultiRabbitProps.class)
    @ConditionalOnProperty(name = "spring.multirabbitmq.enabled", havingValue = "true")
    public class MultiRabbitConfig {

    private final MultiRabbitProps props;
    private final ResourceLoader resourceLoader;
    private final SpringRabbitTracing springRabbitTracing;
    private final ConfigurableBeanFactory configurableBeanFactory;
    private final ObjectProvider<MessageConverter> messageConverter;

    private static final String RABBITMQ_DEFINITIONS = "rabbitmq.definitions.json";
    @Data
    @ConfigurationProperties(prefix = "spring.multirabbitmq")
    public static class MultiRabbitProps {
    public boolean enabled;
    public Map<String, RabbitProperties> connections = new HashMap<>();
    }

    private final CamelContext camelContext;
    private final RestTemplate restTemplate = new RestTemplate();
    private final RabbitMQComponentConfiguration rabbitMQComponentConfiguration;
    @Bean
    public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
    return new RabbitListenerEndpointRegistry();
    }

    @EventListener
    public void loadDefinitions(ContextRefreshedEvent event) throws Exception {
    log.debug("start loading rabbitmq definition");
    String rabbitMQHost = getRabbitMQHost();
    loadDefinitions(rabbitMQHost);
    @Bean
    public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
    MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
    = new MultiRabbitListenerAnnotationBeanPostProcessor();
    postProcessor.setEndpointRegistry(registry);
    postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
    return postProcessor;
    }

    /**
    * We needed to connect to rabbitmq to get the resolved host name,
    * because before the connection is established, the host name is calculated
    * based on the camel properties provided per environment.
    */
    private String getRabbitMQHost() throws IOException, TimeoutException {
    @PostConstruct
    public void registerBeans() {

    final List<ContainerHolder> containerHolders = new ArrayList<>();

    var rabbitMqEndPointKey = camelContext.getEndpointRegistry().keySet().stream()
    .filter(it -> it.get().startsWith("rabbitmq:"))
    .findFirst().orElseThrow();
    for (Map.Entry<String, RabbitProperties> entry : this.props.connections.entrySet()) {

    RabbitMQEndpoint endpoint =
    (RabbitMQEndpoint) camelContext.getEndpointRegistry().get(rabbitMqEndPointKey);
    String key = entry.getKey();
    RabbitProperties rabbitProperties = entry.getValue();
    log.debug("configuring rabbitmq => key: {} with host: {}, port: {}, vhost: {} username: {}",
    key, rabbitProperties.determineHost(), rabbitProperties.determinePort(),
    rabbitProperties.determineVirtualHost(), rabbitProperties.determineUsername());

    try (Connection tmpConnection = endpoint.connect(ForkJoinPool.commonPool())) {
    return tmpConnection.getAddress().getHostAddress();
    var connectionFactory = buildConnectionFactory(rabbitProperties);
    this.configurableBeanFactory.registerSingleton(key + "-connectionFactory", connectionFactory);

    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    this.configurableBeanFactory.registerSingleton(key + "-admin", rabbitAdmin);

    var containerFactory = buildSimpleListenerContainerFactory(connectionFactory);
    this.configurableBeanFactory.registerSingleton(key, containerFactory);

    containerHolders.add(new ContainerHolder(key, rabbitProperties, connectionFactory, containerFactory));
    }

    var rcf = buildRoutingConnectionFactory(containerHolders);
    this.configurableBeanFactory.registerSingleton("rcf", rcf);

    var rabbitTemplate = new RabbitTemplate(rcf);
    springRabbitTracing.decorateRabbitTemplate(rabbitTemplate);
    this.messageConverter.ifUnique(rabbitTemplate::setMessageConverter);
    this.configurableBeanFactory.registerSingleton("rabbitTemplate", rabbitTemplate);

    configureContainerFactory(rabbitTemplate, containerHolders);
    }

    private void loadDefinitions(String rabbitMQHost) throws IOException {
    private CachingConnectionFactory buildConnectionFactory(RabbitProperties rabbitProperties) {
    try {
    // see RabbitAutoConfiguration.java
    var rabbitConnectionFactoryBeanConfigurer =
    new RabbitConnectionFactoryBeanConfigurer(resourceLoader, rabbitProperties);

    var connectionFactoryBean = new RabbitConnectionFactoryBean();
    rabbitConnectionFactoryBeanConfigurer.configure(connectionFactoryBean);
    connectionFactoryBean.afterPropertiesSet();
    com.rabbitmq.client.ConnectionFactory connectionFactory = connectionFactoryBean.getObject();

    if (connectionFactory == null) {
    throw new RuntimeException("connectionFactory is null");
    }

    var is = new ClassPathResource(RABBITMQ_DEFINITIONS).getInputStream();
    String rabbitDefinitions = FileCopyUtils.copyToString(new InputStreamReader(is));
    var factory = new CachingConnectionFactory(connectionFactory);
    CachingConnectionFactoryConfigurer configurer = new CachingConnectionFactoryConfigurer(rabbitProperties);
    configurer.configure(factory);

    HttpEntity<String> request = new HttpEntity<>(rabbitDefinitions, getHeaders());
    ResponseEntity<String> response = restTemplate.exchange(
    "http://" + rabbitMQHost + ":15672/api/definitions", HttpMethod.POST, request, String.class);
    return factory;

    if (response.getStatusCode().is2xxSuccessful()) {
    log.info(RABBITMQ_DEFINITIONS + " loaded successfully");
    } else {
    throw new RuntimeException("cannot load rabbitmq definitions, " + response);
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    }

    private HttpHeaders getHeaders() {
    var headers = new HttpHeaders();
    headers.setBasicAuth(rabbitMQComponentConfiguration.getUsername(), rabbitMQComponentConfiguration.getPassword());
    headers.setContentType(MediaType.APPLICATION_JSON);
    return headers;
    private SimpleRoutingConnectionFactory buildRoutingConnectionFactory(
    List<ContainerHolder> cachingConnectionFactories) {

    Map<Object, ConnectionFactory> map = cachingConnectionFactories.stream()
    .collect(Collectors.toMap(it -> it.key, it -> it.connectionFactory));

    SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
    rcf.setDefaultTargetConnectionFactory(cachingConnectionFactories.get(0).connectionFactory);
    rcf.setTargetConnectionFactories(map);
    return rcf;
    }

    private SimpleRabbitListenerContainerFactory buildSimpleListenerContainerFactory(
    CachingConnectionFactory connectionFactory) {

    SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
    containerFactory.setConnectionFactory(connectionFactory);
    return containerFactory;
    }

    private void configureContainerFactory(
    RabbitTemplate rabbitTemplate, List<ContainerHolder> containerHolders) {

    for (ContainerHolder h : containerHolders) {

    var configurer = new MultiRabbitConfigurer(h.rabbitProperties, h.connectionFactory);
    messageConverter.ifUnique(configurer::setMessageConverter);
    configurer.setMessageRecoverer(
    new MultiRabbitRepublishMessageRecoverer(rabbitTemplate, "general_dl_exchange", h.key));
    configurer.configure(h.containerFactory, h.connectionFactory);
    springRabbitTracing.decorateSimpleRabbitListenerContainerFactory(h.containerFactory);
    }
    }

    private record ContainerHolder(
    String key,
    RabbitProperties rabbitProperties,
    ConnectionFactory connectionFactory,
    SimpleRabbitListenerContainerFactory containerFactory) {
    }

    @RequiredArgsConstructor
    private static class MultiRabbitConfigurer
    extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {

    private final RabbitProperties properties;
    private final ConnectionFactory connectionFactory;

    @Override
    public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory ignored) {
    RabbitProperties.SimpleContainer config = properties.getListener().getSimple();
    PropertyMapper map = PropertyMapper.get();
    configure(factory, connectionFactory, config);
    map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
    map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
    map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
    map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
    }

    @Override
    public void setMessageConverter(MessageConverter messageConverter) {
    super.setMessageConverter(messageConverter);
    }

    @Override
    public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
    super.setMessageRecoverer(messageRecoverer);
    }
    }

    private static class MultiRabbitRepublishMessageRecoverer extends RepublishMessageRecoverer {
    private final String key;

    public MultiRabbitRepublishMessageRecoverer(RabbitTemplate rabbitTemplate, String errorExchange, String key) {
    super(rabbitTemplate, errorExchange);
    this.key = key;
    }

    @Override
    protected void doSend(String exchange, String routingKey, Message message) {
    SimpleResourceHolder.bind(((RabbitTemplate) errorTemplate).getConnectionFactory(), key);
    // routing key is: "error.<original queue name>"
    super.doSend(exchange, routingKey + message.getMessageProperties().getConsumerQueue(), message);
    SimpleResourceHolder.unbind(((RabbitTemplate) errorTemplate).getConnectionFactory());
    }
    }
    }
  2. mhewedy created this gist Nov 7, 2022.
    81 changes: 81 additions & 0 deletions RabbitMQConfig.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,81 @@
    import com.rabbitmq.client.Connection;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.camel.CamelContext;
    import org.apache.camel.component.rabbitmq.RabbitMQEndpoint;
    import org.apache.camel.component.rabbitmq.springboot.RabbitMQComponentConfiguration;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.context.event.EventListener;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.http.*;
    import org.springframework.util.FileCopyUtils;
    import org.springframework.web.client.RestTemplate;

    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.TimeoutException;

    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    @ConditionalOnProperty(name = "app.startup.rabbitmq.load-definitions.enabled")
    public class RabbitMQConfig {

    private static final String RABBITMQ_DEFINITIONS = "rabbitmq.definitions.json";

    private final CamelContext camelContext;
    private final RestTemplate restTemplate = new RestTemplate();
    private final RabbitMQComponentConfiguration rabbitMQComponentConfiguration;

    @EventListener
    public void loadDefinitions(ContextRefreshedEvent event) throws Exception {
    log.debug("start loading rabbitmq definition");
    String rabbitMQHost = getRabbitMQHost();
    loadDefinitions(rabbitMQHost);
    }

    /**
    * We needed to connect to rabbitmq to get the resolved host name,
    * because before the connection is established, the host name is calculated
    * based on the camel properties provided per environment.
    */
    private String getRabbitMQHost() throws IOException, TimeoutException {

    var rabbitMqEndPointKey = camelContext.getEndpointRegistry().keySet().stream()
    .filter(it -> it.get().startsWith("rabbitmq:"))
    .findFirst().orElseThrow();

    RabbitMQEndpoint endpoint =
    (RabbitMQEndpoint) camelContext.getEndpointRegistry().get(rabbitMqEndPointKey);

    try (Connection tmpConnection = endpoint.connect(ForkJoinPool.commonPool())) {
    return tmpConnection.getAddress().getHostAddress();
    }
    }

    private void loadDefinitions(String rabbitMQHost) throws IOException {

    var is = new ClassPathResource(RABBITMQ_DEFINITIONS).getInputStream();
    String rabbitDefinitions = FileCopyUtils.copyToString(new InputStreamReader(is));

    HttpEntity<String> request = new HttpEntity<>(rabbitDefinitions, getHeaders());
    ResponseEntity<String> response = restTemplate.exchange(
    "http://" + rabbitMQHost + ":15672/api/definitions", HttpMethod.POST, request, String.class);

    if (response.getStatusCode().is2xxSuccessful()) {
    log.info(RABBITMQ_DEFINITIONS + " loaded successfully");
    } else {
    throw new RuntimeException("cannot load rabbitmq definitions, " + response);
    }
    }

    private HttpHeaders getHeaders() {
    var headers = new HttpHeaders();
    headers.setBasicAuth(rabbitMQComponentConfiguration.getUsername(), rabbitMQComponentConfiguration.getPassword());
    headers.setContentType(MediaType.APPLICATION_JSON);
    return headers;
    }
    }