<dependencies> <!--rabbitmq的依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
# RabbitMQ的配置 spring: rabbitmq: host: 自己服务器ip port: 5672 username: admin password: admin # 要是有Vhost也可以进行配置
package cn.zixieqing.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class MqConfig { /** * 正常交换机名称 */ private static final String TTL_NORMAL_EXCHANGE = "X"; /** * 死信交换机名称 */ private static final String TTL_DEAD_LETTER_EXCHANGE = "Y"; /** * 正常队列名称 */ private static final String TTL_NORMAL_QUEUE_A = "QA"; private static final String TTL_NORMAL_QUEUE_B = "QB"; /** * 死信队列名称 */ private static final String TTL_DEAD_LETTER_QUEUE_D = "QD"; /** * 正常交换机 和 正常队列A的routing key */ private static final String TTL_NORMAL_EXCHANGE_BIND_QUEUE_A = "XA"; /** * 正常交换机 和 正常队列B的routing key */ private static final String TTL_NORMAL_EXCHANGE_BIND_QUEUE_B = "XB"; /** * 正常队列 和 死信交换机 及 死信交换机 与 死信队列的routing key */ private static final String TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND = "YD"; /** * 声明正常交换机 */ @Bean("xExchange") public DirectExchange xExchange() { // 直接创建是什么类型的交换机 加上 交换机名字就可以了 return new DirectExchange(TTL_NORMAL_EXCHANGE); } /** * 声明死信交换机 */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(TTL_DEAD_LETTER_EXCHANGE); } /** * 声明正常队列QA 并 绑定死信交互机Y */ @Bean("queueA") public Queue queueA() { // initialCapacity map初始值:(存的元素个数 / 负载因子0.75) + 1 HashMap<String, Object> params = new HashMap<>(5); params.put("x-dead-letter-exchange", TTL_DEAD_LETTER_EXCHANGE); params.put("x-dead-letter-routing-key", TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND); params.put("x-message-ttl", 10 * 1000); // 构建队列 并 传入相应的参数 return QueueBuilder.durable(TTL_NORMAL_QUEUE_A) .withArguments(params) .build(); } /** * X正常交换机 和 QA正常队列绑定 */ @Bean public Binding xChangeBindingQueueA(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA) .to(xExchange) .with(TTL_NORMAL_EXCHANGE_BIND_QUEUE_A); } /** * 声明正常队列QB 并 绑定死信交换机Y */ @Bean("queueB") public Queue queueB() { /* initialCapacity map初始值:(存的元素个数 / 负载因子0.75) + 1 */ HashMap<String, Object> params = new HashMap<>(5); params.put("x-dead-letter-exchange", TTL_DEAD_LETTER_EXCHANGE); params.put("x-dead-letter-routing-key", TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND); params.put("x-message-ttl", 40 * 1000); // 构建队列 并 传入相应的参数 return QueueBuilder.durable(TTL_NORMAL_QUEUE_B) .withArguments(params) .build(); } /** * X正常交换机 和 QB正常队列绑定 */ @Bean public Binding xChangeBindingQueueB(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB) .to(xExchange) .with(TTL_NORMAL_EXCHANGE_BIND_QUEUE_B); } /** * 声明死信队列D */ @Bean("queueD") public Queue queueD() { return new Queue(TTL_DEAD_LETTER_QUEUE_D); } /** * 死信交换机 和 私信队列进行绑定 */ @Bean public Binding yExchangeBindingQueueD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD) .to(yExchange) .with(TTL_NORMAL_QUEUE_AND_DEAD_LETTER_EXCHANGE_AND_DEAD_LETTER_QUEUE_BIND); } }
新加一个依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
生产者伪代码
package cn.zixieqing.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController @RequestMapping("sendMsg") public class MqProducerController { /** * 这个玩意儿是Spring提供的 */ @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("{message}") public void sendMsg(@PathVariable String message) { System.out.println( new Date() + ":接收到了消息===>" + message); // 发送消息 rabbitTemplate.convertAndSend("X","XA","这条消息是来着TTL为10s的===>" + message); rabbitTemplate.convertAndSend("X","XB","这条消息是来着TTL为40s的===>" + message); } }
package cn.zixieqing.consumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Date; @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveMsg(Message message,Channel Channel) { System.out.println( new Date() + "接收到了消息===>" + new String( message.getBody(), StandardCharsets.UTF_8)); } }
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.9.15/plugins # 版本号改成自己的
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
systemctl restart rabbitmq-server
原来的延迟队列设置
使插件之后的延迟设置
package cn.zixieqing.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class DelayedExchanegConfig { /** * 交换机名字 */ private static final String EXCHANGE_NAME = "delayed.exchange"; /** * 队列名字 */ private static final String QUEUE_NAME = "delayed.queue"; /** * 绑定键值 */ private static final String EXCHANGE_BINDING_QUEUE_ROUTING_KEY = "delayed.routingkey"; /** * 声明交换机 - 目前这种交换机是没有的,这是插件的,因此:选择自定义交换机 */ @Bean public CustomExchange delayedExchange() { HashMap<String, Object> params = new HashMap<>(3); // 延迟类型 params.put("x-delayed-type", "direct"); /* 参数1、交换机名字 参数2、交换机类型 - 插件的那个类型 参数3、交换机是否持久化 参数4、交换机是否自动删除 参数5、交换机的其他配置 */ return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, params); } /** * 声明队列 */ @Bean public Queue delayedQueue() { return new Queue(QUEUE_NAME); } /** * 交换机 和 队列 进行绑定 */ public Binding exchangeBindingQueue(@Qualifier("delayedExchange") CustomExchange delayedExchange, @Qualifier("delayedQueue") Queue delayedQueue) { return BindingBuilder .bind(delayedQueue) .to(delayedExchange) .with(EXCHANGE_BINDING_QUEUE_ROUTING_KEY) // noargs()就是构建的意思 和 build()一样 .noargs(); } }
package cn.zixieqing.controller; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController @RequestMapping("sendMsg") public class DelatedQueueController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/{message}/{ttl}") public void getMesg(@PathVariable String message, @PathVariable int ttl) { System.out.println(new Date() + "接收到了消息===>" + message + "===>失效时间为:" + ttl); // 发送消息 rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", data->{ // 设置失效时间 data.getMessageProperties().setDelay(10 * 1000); return data; }); } }
package cn.zixieqing.consumer; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.Date; @Component public class DelayedQueueConsumer { @RabbitListener(queues = "delayed.queue") public void receiveMessage(Message message) { System.out.println("消费者正在消费消息......"); String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println(new Date() + "消费了消息===>" + message); } }
ConfirmCallback() 和 ReturnCallback()的配置
spring: rabbitmq: # 发布确认类型 publisher-confirm-type: correlated # 队列未收到消息时,触发returnCallback回调 publisher-returns: true
@Component public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** 初始化方法 目的:因为ConfirmCallback 和 ReturnCallback这两个接口是RabbitTemplate的内部类 因此:想要让当前编写的PublisherConfirmAndReturnConfig能够访问到这两个接口 那么:就需要把当前类PublisherConfirmAndReturnConfig的confirmCallback 和 returnCallback注入到RabbitTemplate中去( init的作用 ) */ @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** 参数1、发送消息的ID - correlationData.getID() 和 消息的相关信息 参数2、是否成功发送消息给exchange true成功;false失败 参数3、失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息已经送达到Exchange"); }else{ System.out.println("消息没有送达到Exchange"); } } /** 参数1、消息 new String(message.getBody()) 参数2、消息退回的状态码 参数3、消息退回的原因 参数4、交换机名字 参数5、路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息没有送达到Queue"); } }
上图架构的伪代码配置编写
package cn.zixieqing.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AlternateExchangeConfig { /** * 正常交换机名字 */ private static final String NORMAL_EXCHANGE_NAME = "normal_exchange"; /** * 正常队列 */ private static final String NORMAL_QUEUE_NAME = "normal_queue"; /** * 备份交换机名字 */ private static final String ALTERNATE_EXCHANGE_NAME = "alternate_exchange"; /** * 备份队列名字 */ private static final String ALTERNATE_QUEUE_NAME = "alternate_queue"; /** * 用于警告的队列名字 */ private static final String WARNING_QUEUE_NAME = "warning_queue"; /** * 声明正常交换机 但是:需要做一件事情 - 消息没投递到正常队列时,需要让其走备份交换机 */ @Bean public DirectExchange confirmExchange() { return ExchangeBuilder .directExchange(NORMAL_EXCHANGE_NAME) .durable(true) // 绑定备份交换机 .withArgument("alternate-exchange", ALTERNATE_EXCHANGE_NAME) .build(); } /** * 声明确认队列 */ @Bean public Queue confirmQueue() { return new Queue(NORMAL_QUEUE_NAME); } /** * 确认交换机( 正常交换机 ) 和 确认队列进行绑定 */ @Bean public Binding confirmExchangeBindingConfirmQueue(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue) { return BindingBuilder .bind(confirmQueue) .to(confirmExchange) .with("routingkey"); } /** * 声明备份交换机 */ @Bean public FanoutExchange alternateExchange() { return new FanoutExchange(ALTERNATE_EXCHANGE_NAME); } /** * 声明备份队列 */ @Bean public Queue alternateQueue() { return QueueBuilder .durable(ALTERNATE_QUEUE_NAME) .build(); } /** * 声明警告队列 */ @Bean public Queue warningQueue() { return new Queue(WARNING_QUEUE_NAME); } /** * 备份交换机 和 备份队列进行绑定 */ @Bean public Binding alternateExchangeBindingAlternateQueue(@Qualifier("alternateQueue") Queue alternateQueue, @Qualifier("alternateExchange") FanoutExchange alternateExchange) { return BindingBuilder .bind(alternateQueue) .to(alternateExchange); } /** * 备份交换机 和 警告队列进行绑定 */ @Bean public Binding alternateExchangeBindingWarningQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("alternateExchange") FanoutExchange alternateExchange) { return BindingBuilder .bind(warningQueue) .to(alternateExchange); } }
想要实现优先级队列,需要满足如下条件:
1、队列本身设置优先级( 在声明队列是进行参数配置 )
/** * 基础型配置 */ Map<String, Object> params = new HashMap(); params.put("x-max-priority", 10); // 默认区间:(0, 255) 但是若用这个区间,则会浪费CPU和内层消耗,因此:改为(0, 10)即可 channel.queueDeclare("hello", true, false, false, params); /** * SpringBoot中的配置 */ @Bean public Queue alternateQueue() { // 空间大小: ( map存储的元素个数 / 0.75 ) + 1 HashMap<String, Object> params = new HashMap<>(3); params.put("x-max-priority", 10); return QueueBuilder .durable(ALTERNATE_QUEUE_NAME).withArguments(params) .build(); }
2、让消息有优先级
/** * 基础型配置 - 生产者调用basicPublisher()时配置的消息properties */ AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .priority(5) .build(); /** * SpringBoot中的配置 */ // 发送消息 rabbitTemplate.convertAndSend("normal.exchange", "normal.routingkey", data->{ // 消息设置优先级 - 注意:这个数值不能比前面队列设置的那个优先级数值大,即:这里的消息优先级范围就是前面队列中设置的(0, 10) data.getMessageProperties().setPriority(5); return data; });
注意点:设置了优先级之后,需要做到如下条件:
设置惰性队列的配置
/** * 基础型配置 */ Map<String, Object> params = new HashMap(); params.put("x-queue-mode", "lazy"); channel.queueDeclare("hello", true, false, false, params); /** * SpringBoot中的配置 */ @Bean public Queue alternateQueue() { // 空间大小: ( map存储的元素个数 / 0.75 ) + 1 HashMap<String, Object> params = new HashMap<>(3); params.put("x-queue-mode", "lazy"); return QueueBuilder .durable(ALQUEUE_NAME).withArguments(params) .build(); }