- YML
rabbitmq: first: username: ${app.appkey} password: ${app.appkey} virtual-host: ${app.appid} addresses: x.x.x.x:5672,x.x.x.x:5672 #集群 second: username: guest password: guest virtual-host: / host: 127.0.0.1 port: 5672
- 配置源
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; / * RabbitMq多源配置 * * @author Lenovo */ @Configuration public class RabbitConfig { @Bean(name = "firstConnectionFactory") @Primary public ConnectionFactory firstConnectionFactory( @Value("${spring.rabbitmq.first.addresses}") String addresses, @Value("${spring.rabbitmq.first.username}") String username, @Value("${spring.rabbitmq.first.password}") String password, @Value("${spring.rabbitmq.first.virtual-host}") String virtualHost ) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean(name = "secondConnectionFactory") public ConnectionFactory secondConnectionFactory( @Value("${spring.rabbitmq.second.host}") String host, @Value("${spring.rabbitmq.second.port}") int port, @Value("${spring.rabbitmq.second.username}") String username, @Value("${spring.rabbitmq.second.password}") String password, @Value("${spring.rabbitmq.second.virtual-host}") String virtualHost ) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean(name = "firstRabbitTemplate") @Primary public RabbitTemplate firstRabbitTemplate( @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory ) { RabbitTemplate firstRabbitTemplate = new RabbitTemplate(connectionFactory); return firstRabbitTemplate; } @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate( @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory ) { RabbitTemplate secondRabbitTemplate = new RabbitTemplate(connectionFactory); return secondRabbitTemplate; } @Bean(name = "firstFactory") public SimpleRabbitListenerContainerFactory firstFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "secondFactory") public SimpleRabbitListenerContainerFactory secondFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } }
- 信道构建器
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; / * 信道构建器 * * @author Lenovo */ @Configuration public class CreateQueue { @Bean public String chargeQueue(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { try { connectionFactory.createConnection().createChannel(false).queueDeclare("test.add", true, false, false, null); }catch (IOException e){ e.printStackTrace(); } return "test.add"; } }
- 信道监听器
package com.ciih.authcenter.client.mq; import com.ciih.authcenter.manager.entity.Permission; import com.rabbitmq.client.Channel; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; / * 信道监听器 * * @author Lenovo */ @Slf4j @Component public class ListeningHandle { public static final String ENCODING = "UTF-8"; @RabbitHandler @RabbitListener(queues = {RabbitConfig.USERS_ADD}, containerFactory = "firstFactory") @SneakyThrows public void onMessageUserAdd(Message message, Channel channel) { log.info("[listenerManualAck 监听的消息userAdd] - [{}]", new String(message.getBody(), ENCODING)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch ( IOException e) { } } @RabbitHandler @RabbitListener(queues = {RabbitConfig.USERS_UPDATE}, containerFactory = "firstFactory") @SneakyThrows public void onMessageUserUpdate(Message message, Channel channel) { log.info("[listenerManualAck 监听的消息userUpdate] - [{}]", new String(message.getBody(), ENCODING)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch ( IOException e) { } } @RabbitHandler @RabbitListener(queues = {RabbitConfig.USERS_DELETE}, containerFactory = "firstFactory") @SneakyThrows public void onMessageUserDelete(Message message, Channel channel) { log.info("[listenerManualAck 监听的消息userDelete] - [{}]", new String(message.getBody(), ENCODING)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch ( IOException e) { } } @RabbitHandler @RabbitListener(queues = {RabbitConfig.ORGS_ADD}, containerFactory = "firstFactory") @SneakyThrows public void onMessageOrgsAdd(Message message, Channel channel) { log.info("[listenerManualAck 监听的消息orgsAdd] - [{}]", new String(message.getBody(), ENCODING)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch ( IOException e) { } } @RabbitHandler @RabbitListener(queues = {RabbitConfig.ORGS_UPDATE}, containerFactory = "firstFactory") @SneakyThrows public void onMessageOrgsUpdate(Message message, Channel channel) { log.info("[listenerManualAck 监听的消息orgsUpdate] - [{}]", new String(message.getBody(), ENCODING)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch ( IOException e) { } } @RabbitHandler @RabbitListener(queues = {RabbitConfig.ORGS_DELETE}, containerFactory = "firstFactory") @SneakyThrows public void onMessageOrgsDelete(Message message, Channel channel) { log.info("[listenerManualAck 监听的消息orgsDelete] - [{}]", new String(message.getBody(), ENCODING)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch ( IOException e) { } } @RabbitListener(queues = {"test.add"}, containerFactory = "secondFactory") @SneakyThrows public void hospitalAdd(List
permissions, Message message, Channel channel) { System.out.println(permissions); } }
- 发送消息
import com.ciih.authcenter.manager.entity.Permission; import com.ciih.authcenter.manager.service.PermissionService; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.List; @RestController public class Sender { @Resource PermissionService permissionService; @Resource(name = "secondRabbitTemplate") private RabbitTemplate secondRabbitTemplate; @GetMapping("test1") public void send1() { List
list = permissionService.lambdaQuery().last("limit 0, 10").list(); this.secondRabbitTemplate.convertAndSend("test.add", list); } }
- 依赖
org.springframework.boot spring-boot-starter-amqp
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/206374.html原文链接:https://javaforall.net
