手把手教你 springBoot 整合 rabbitMQ,利用 MQ 实现事务补偿

rabbitMQ 在互联网公司有着大规模应用,本篇将实战介绍 springboot 整合 rabbi
首页 新闻资讯 行业资讯 手把手教你 springBoot 整合 rabbitMQ,利用 MQ 实现事务补偿

[[341134]]

本文转载自微信公众号「Java极客技术」,作者鸭血粉丝。转载本文请联系Java极客技术公众号。  

rabbitMQ 在互联网公司有着大规模应用,本篇将实战介绍 springboot 整合 rabbitMQ,同时也将在具体的业务场景中介绍利用 MQ  实现事务补偿操作。

一、介绍

本篇我们一起来实操一下SpringBoot整合rabbitMQ,为后续业务处理做铺垫。

废话不多说,直奔主题!

二、整合实战

2.1、创建一个 maven 工程,引入 amqp 包

复制

<!--amqp 支持--> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

2.2、在全局文件中配置 rabbitMQ 服务信息

复制

spring.rabbitmq.addresses=197.168.24.206:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
  • 1.

  • 2.

  • 3.

  • 4.

其中,spring.rabbitmq.addresses参数值为 rabbitmq 服务器地址

2.3、编写 rabbitmq 配置类

复制

@Slf4j @Configuration public class RabbitConfig {      /**      * 初始化连接工厂      * @param addresses      * @param userName      * @param password      * @param vhost      * @return      */     @Bean     ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses,                                         @Value("${spring.rabbitmq.username}") String userName,                                         @Value("${spring.rabbitmq.password}") String password,                                         @Value("${spring.rabbitmq.virtual-host}") String vhost) {         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();         connectionFactory.setAddresses(addresses);         connectionFactory.setUsername(userName);         connectionFactory.setPassword(password);         connectionFactory.setVirtualHost(vhost);         return connectionFactory;     }      /**      * 重新实例化 RabbitAdmin 操作类      * @param connectionFactory      * @return      */     @Bean     public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){         return new RabbitAdmin(connectionFactory);     }      /**      * 重新实例化 RabbitTemplate 操作类      * @param connectionFactory      * @return      */     @Bean     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){         RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);         //数据转换为json存入消息队列         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());         return rabbitTemplate;     }      /**      * 将 RabbitUtil 操作工具类加入IOC容器      * @return      */     @Bean     public RabbitUtil rabbitUtil(){         return new RabbitUtil();     }  }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

  • 42.

  • 43.

  • 44.

  • 45.

  • 46.

  • 47.

  • 48.

  • 49.

  • 50.

  • 51.

  • 52.

  • 53.

  • 54.

  • 55.

  • 56.

  • 57.

  • 58.

2.4、编写 RabbitUtil 工具类

复制

public class RabbitUtil {      private static final Logger logger = LoggerFactory.getLogger(RabbitUtil.class);      @Autowired     private RabbitAdmin rabbitAdmin;      @Autowired     private RabbitTemplate rabbitTemplate;      /**      * 创建Exchange      * @param exchangeName      */     public void addExchange(String exchangeType, String exchangeName){         Exchange exchange = createExchange(exchangeType, exchangeName);         rabbitAdmin.declareExchange(exchange);     }      /**      * 删除一个Exchange      * @param exchangeName      */     public boolean deleteExchange(String exchangeName){         return rabbitAdmin.deleteExchange(exchangeName);     }      /**      * 创建一个指定的Queue      * @param queueName      * @return queueName      */     public void addQueue(String queueName){         Queue queue = createQueue(queueName);         rabbitAdmin.declareQueue(queue);     }      /**      * 删除一个queue      * @return queueName      * @param queueName      */     public boolean deleteQueue(String queueName){         return rabbitAdmin.deleteQueue(queueName);     }      /**      * 按照筛选条件,删除队列      * @param queueName      * @param unused 是否被使用      * @param empty 内容是否为空      */     public void deleteQueue(String queueName, boolean unused, boolean empty){         rabbitAdmin.deleteQueue(queueName,unused,empty);     }      /**      * 清空某个队列中的消息,注意,清空的消息并没有被消费      * @return queueName      * @param queueName      */     public void purgeQueue(String queueName){         rabbitAdmin.purgeQueue(queueName, false);     }      /**      * 判断指定的队列是否存在      * @param queueName      * @return      */     public boolean existQueue(String queueName){         return rabbitAdmin.getQueueProperties(queueName) == null ? false : true;     }      /**      * 绑定一个队列到一个匹配型交换器使用一个routingKey      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers EADERS模式类型设置,其他模式类型传空      */     public void addBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);         rabbitAdmin.declareBinding(binding);     }      /**      * 声明绑定      * @param binding      */     public void addBinding(Binding binding){         rabbitAdmin.declareBinding(binding);     }      /**      * 解除交换器与队列的绑定      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers      */     public void removeBinding(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         Binding binding = bindingBuilder(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);         removeBinding(binding);     }      /**      * 解除交换器与队列的绑定      * @param binding      */     public void removeBinding(Binding binding){         rabbitAdmin.removeBinding(binding);     }      /**      * 创建一个交换器、队列,并绑定队列      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers      */     public void andExchangeBindingQueue(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         //声明交换器         addExchange(exchangeType, exchangeName);         //声明队列         addQueue(queueName);         //声明绑定关系         addBinding(exchangeType, exchangeName, queueName, routingKey, isWhereAll, headers);     }      /**      * 发送消息      * @param exchange      * @param routingKey      * @param object      */     public void convertAndSend(String exchange, String routingKey, final Object object){         rabbitTemplate.convertAndSend(exchange, routingKey, object);     }      /**      * 转换Message对象      * @param messageType      * @param msg      * @return      */     public Message getMessage(String messageType, Object msg){         MessageProperties messageProperties = new MessageProperties();         messageProperties.setContentType(messageType);         Message message = new Message(msg.toString().getBytes(),messageProperties);         return message;     }      /**      * 声明交换机      * @param exchangeType      * @param exchangeName      * @return      */     private Exchange createExchange(String exchangeType, String exchangeName){         if(ExchangeType.DIRECT.equals(exchangeType)){             return new DirectExchange(exchangeName);         }         if(ExchangeType.TOPIC.equals(exchangeType)){             return new TopicExchange(exchangeName);         }         if(ExchangeType.HEADERS.equals(exchangeType)){             return new HeadersExchange(exchangeName);         }         if(ExchangeType.FANOUT.equals(exchangeType)){             return new FanoutExchange(exchangeName);         }         return null;     }      /**      * 声明绑定关系      * @param exchangeType      * @param exchangeName      * @param queueName      * @param routingKey      * @param isWhereAll      * @param headers      * @return      */     private Binding bindingBuilder(String exchangeType, String exchangeName, String queueName, String routingKey, boolean isWhereAll, Map<String, Object> headers){         if(ExchangeType.DIRECT.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new DirectExchange(exchangeName)).with(routingKey);         }         if(ExchangeType.TOPIC.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new TopicExchange(exchangeName)).with(routingKey);         }         if(ExchangeType.HEADERS.equals(exchangeType)){             if(isWhereAll){                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAll(headers).match();             }else{                 return BindingBuilder.bind(new Queue(queueName)).to(new HeadersExchange(exchangeName)).whereAny(headers).match();             }         }         if(ExchangeType.FANOUT.equals(exchangeType)){             return BindingBuilder.bind(new Queue(queueName)).to(new FanoutExchange(exchangeName));         }         return null;     }      /**      * 声明队列      * @param queueName      * @return      */     private Queue createQueue(String queueName){         return new Queue(queueName);     }       /**      * 交换器类型      */     public final static class ExchangeType {          /**          * 直连交换机(全文匹配)          */         public final static String DIRECT = "DIRECT";          /**          * 通配符交换机(两种通配符:*只能匹配一个单词,#可以匹配零个或多个)          */         public final static String TOPIC = "TOPIC";          /**          * 头交换机(自定义键值对匹配,根据发送消息内容中的headers属性进行匹配)          */         public final static String HEADERS = "HEADERS";          /**          * 扇形(广播)交换机 (将消息转发到所有与该交互机绑定的队列上)          */         public final static String FANOUT = "FANOUT";     } }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

  • 42.

  • 43.

  • 44.

  • 45.

  • 46.

  • 47.

  • 48.

  • 49.

  • 50.

  • 51.

  • 52.

  • 53.

  • 54.

  • 55.

  • 56.

  • 57.

  • 58.

  • 59.

  • 60.

  • 61.

  • 62.

  • 63.

  • 64.

  • 65.

  • 66.

  • 67.

  • 68.

  • 69.

  • 70.

  • 71.

  • 72.

  • 73.

  • 74.

  • 75.

  • 76.

  • 77.

  • 78.

  • 79.

  • 80.

  • 81.

  • 82.

  • 83.

  • 84.

  • 85.

  • 86.

  • 87.

  • 88.

  • 89.

  • 90.

  • 91.

  • 92.

  • 93.

  • 94.

  • 95.

  • 96.

  • 97.

  • 98.

  • 99.

  • 100.

  • 101.

  • 102.

  • 103.

  • 104.

  • 105.

  • 106.

  • 107.

  • 108.

  • 109.

  • 110.

  • 111.

  • 112.

  • 113.

  • 114.

  • 115.

  • 116.

  • 117.

  • 118.

  • 119.

  • 120.

  • 121.

  • 122.

  • 123.

  • 124.

  • 125.

  • 126.

  • 127.

  • 128.

  • 129.

  • 130.

  • 131.

  • 132.

  • 133.

  • 134.

  • 135.

  • 136.

  • 137.

  • 138.

  • 139.

  • 140.

  • 141.

  • 142.

  • 143.

  • 144.

  • 145.

  • 146.

  • 147.

  • 148.

  • 149.

  • 150.

  • 151.

  • 152.

  • 153.

  • 154.

  • 155.

  • 156.

  • 157.

  • 158.

  • 159.

  • 160.

  • 161.

  • 162.

  • 163.

  • 164.

  • 165.

  • 166.

  • 167.

  • 168.

  • 169.

  • 170.

  • 171.

  • 172.

  • 173.

  • 174.

  • 175.

  • 176.

  • 177.

  • 178.

  • 179.

  • 180.

  • 181.

  • 182.

  • 183.

  • 184.

  • 185.

  • 186.

  • 187.

  • 188.

  • 189.

  • 190.

  • 191.

  • 192.

  • 193.

  • 194.

  • 195.

  • 196.

  • 197.

  • 198.

  • 199.

  • 200.

  • 201.

  • 202.

  • 203.

  • 204.

  • 205.

  • 206.

  • 207.

  • 208.

  • 209.

  • 210.

  • 211.

  • 212.

  • 213.

  • 214.

  • 215.

  • 216.

  • 217.

  • 218.

  • 219.

  • 220.

  • 221.

  • 222.

  • 223.

  • 224.

  • 225.

  • 226.

  • 227.

  • 228.

  • 229.

  • 230.

  • 231.

  • 232.

  • 233.

  • 234.

  • 235.

  • 236.

  • 237.

  • 238.

  • 239.

  • 240.

  • 241.

  • 242.

  • 243.

  • 244.

  • 245.

  • 246.

  • 247.

此致, rabbitMQ 核心操作功能操作已经开发完毕!

2.5、编写队列监听类(静态)

复制

@Slf4j @Configuration public class DirectConsumeListener {      /**      * 监听指定队列,名称:mq.direct.1      * @param message      * @param channel      * @throws IOException      */     @RabbitListener(queues = "mq.direct.1")     public void consume(Message message, Channel channel) throws IOException {         log.info("DirectConsumeListener,收到消息: {}", message.toString());     } }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

如果你需要监听指定的队列,只需要方法上加上@RabbitListener(queues = "")即可,同时填写对应的队列名称。

但是,如果你想动态监听队列,而不是通过写死在方法上呢?

请看下面介绍!

2.6、编写队列监听类(动态)

重新实例化一个SimpleMessageListenerContainer对象,这个对象就是监听容器。

复制

@Slf4j @Configuration public class DynamicConsumeListener {      /**      * 使用SimpleMessageListenerContainer实现动态监听      * @param connectionFactory      * @return      */     @Bean     public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);         container.setMessageListener((MessageListener) message -> {             log.info("ConsumerMessageListen,收到消息: {}", message.toString());         });         return container;     } }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

如果想向SimpleMessageListenerContainer添加监听队列或者移除队列,只需通过如下方式即可操作。

复制

@Slf4j @RestController @RequestMapping("/consumer") public class ConsumerController {      @Autowired     private SimpleMessageListenerContainer container;      @Autowired     private RabbitUtil rabbitUtil;      /**      * 添加队列到监听器      * @param consumerInfo      */     @PostMapping("addQueue")     public void addQueue(@RequestBody ConsumerInfo consumerInfo) {         boolean existQueue = rabbitUtil.existQueue(consumerInfo.getQueueName());         if(!existQueue){             throw new CommonExecption("当前队列不存在");         }         //消费mq消息的类         container.addQueueNames(consumerInfo.getQueueName());         //打印监听容器中正在监听到队列         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     }      /**      * 移除正在监听的队列      * @param consumerInfo      */     @PostMapping("removeQueue")     public void removeQueue(@RequestBody ConsumerInfo consumerInfo) {         //消费mq消息的类         container.removeQueueNames(consumerInfo.getQueueName());         //打印监听容器中正在监听到队列         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     }      /**      * 查询监听容器中正在监听到队列      */     @PostMapping("queryListenerQueue")     public void queryListenerQueue() {         log.info("container-queue:{}", JsonUtils.toJson(container.getQueueNames()));     } }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

  • 42.

  • 43.

  • 44.

  • 45.

  • 46.

  • 47.

2.7、发送消息到交换器

发送消息到交换器,非常简单,只需要通过如下方式即可!

  • 先编写一个请求参数实体类

复制

@Data public class ProduceInfo implements Serializable {      private static final long serialVersionUID = 1l;      /**      * 交换器名称      */     private String exchangeName;      /**      * 路由键key      */     private String routingKey;      /**      * 消息内容      */     public String msg; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 编写接口api

复制

@RestController @RequestMapping("/produce") public class ProduceController {      @Autowired     private RabbitUtil rabbitUtil;      /**      * 发送消息到交换器      * @param produceInfo      */     @PostMapping("sendMessage")     public void sendMessage(@RequestBody ProduceInfo produceInfo) {         rabbitUtil.convertAndSend(produceInfo.getExchangeName(), produceInfo.getRoutingKey(), produceInfo);     }  }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

当然,你也可以直接使用rabbitTemplate操作类,来实现发送消息。

复制

rabbitTemplate.convertAndSend(exchange, routingKey, message);
  • 1.

参数内容解释:

  • exchange:表示交换器名称

  • routingKey:表示路由键key

  • message:表示消息

2.8、交换器、队列维护操作

如果想通过接口对 rabbitMQ 中的交换器、队列以及绑定关系进行维护,通过如下方式接口操作,即可实现!

先编写一个请求参数实体类

复制

@Data public class QueueConfig implements Serializable{      private static final long serialVersionUID = 1l;      /**      * 交换器类型      */     private String exchangeType;      /**      * 交换器名称      */     private String exchangeName;      /**      * 队列名称      */     private String queueName;      /**      * 路由键key      */     private String routingKey; }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

编写接口api

复制

/**  * rabbitMQ管理操作控制层  */ @RestController @RequestMapping("/config") public class RabbitController {       @Autowired     private RabbitUtil rabbitUtil;      /**      * 创建交换器      * @param config      */     @PostMapping("addExchange")     public void addExchange(@RequestBody QueueConfig config) {         rabbitUtil.addExchange(config.getExchangeType(), config.getExchangeName());     }      /**      * 删除交换器      * @param config      */     @PostMapping("deleteExchange")     public void deleteExchange(@RequestBody QueueConfig config) {         rabbitUtil.deleteExchange(config.getExchangeName());     }      /**      * 添加队列      * @param config      */     @PostMapping("addQueue")     public void addQueue(@RequestBody QueueConfig config) {         rabbitUtil.addQueue(config.getQueueName());     }      /**      * 删除队列      * @param config      */     @PostMapping("deleteQueue")     public void deleteQueue(@RequestBody QueueConfig config) {         rabbitUtil.deleteQueue(config.getQueueName());     }      /**      * 清空队列数据      * @param config      */     @PostMapping("purgeQueue")     public void purgeQueue(@RequestBody QueueConfig config) {         rabbitUtil.purgeQueue(config.getQueueName());     }      /**      * 添加绑定      * @param config      */     @PostMapping("addBinding")     public void addBinding(@RequestBody QueueConfig config) {         rabbitUtil.addBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);     }      /**      * 解除绑定      * @param config      */     @PostMapping("removeBinding")     public void removeBinding(@RequestBody QueueConfig config) {         rabbitUtil.removeBinding(config.getExchangeType(), config.getExchangeName(), config.getQueueName(), config.getRoutingKey(), false, null);     }      /**      * 创建头部类型的交换器      * 判断条件是所有的键值对都匹配成功才发送到队列      * @param config      */     @PostMapping("andExchangeBindingQueueOfHeaderAll")     public void andExchangeBindingQueueOfHeaderAll(@RequestBody QueueConfig config) {         HashMap<String, Object> header = new HashMap<>();         header.put("queue", "queue");         header.put("bindType", "whereAll");         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, true, header);     }      /**      * 创建头部类型的交换器      * 判断条件是只要有一个键值对匹配成功就发送到队列      * @param config      */     @PostMapping("andExchangeBindingQueueOfHeaderAny")     public void andExchangeBindingQueueOfHeaderAny(@RequestBody QueueConfig config) {         HashMap<String, Object> header = new HashMap<>();         header.put("queue", "queue");         header.put("bindType", "whereAny");         rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS, config.getExchangeName(), config.getQueueName(), null, false, header);     } }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 22.

  • 23.

  • 24.

  • 25.

  • 26.

  • 27.

  • 28.

  • 29.

  • 30.

  • 31.

  • 32.

  • 33.

  • 34.

  • 35.

  • 36.

  • 37.

  • 38.

  • 39.

  • 40.

  • 41.

  • 42.

  • 43.

  • 44.

  • 45.

  • 46.

  • 47.

  • 48.

  • 49.

  • 50.

  • 51.

  • 52.

  • 53.

  • 54.

  • 55.

  • 56.

  • 57.

  • 58.

  • 59.

  • 60.

  • 61.

  • 62.

  • 63.

  • 64.

  • 65.

  • 66.

  • 67.

  • 68.

  • 69.

  • 70.

  • 71.

  • 72.

  • 73.

  • 74.

  • 75.

  • 76.

  • 77.

  • 78.

  • 79.

  • 80.

  • 81.

  • 82.

  • 83.

  • 84.

  • 85.

  • 86.

  • 87.

  • 88.

  • 89.

  • 90.

  • 91.

  • 92.

  • 93.

  • 94.

  • 95.

  • 96.

  • 97.

  • 98.

  • 99.

  • 100.

至此,rabbitMQ 管理器基本的 crud 全部开发完成!

三、利用 MQ 实现事务补偿

当然,我们花了这么大的力气,绝不仅仅是为了将 rabbitMQ 通过 web  项目将其管理起来,最重要的是能投入业务使用中去!

上面的操作只是告诉我们怎么使用 rabbitMQ!

  • 当你仔细回想整个过程的时候,其实还是回到最初那个问题,什么时候使用 MQ ?

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:支付订单、扣减库存、生成相应单据、发红包、发短信通知等等。

在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用  MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取 MQ 的消息(或者由 MQ  推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

这种是利用 MQ 实现业务解耦,其它的场景包括最终一致性、广播、错峰流控等等。

利用 MQ 实现业务解耦的过程其实也很简单。

  • 当主流程结束之后,将消息推送到发红包、发短信交换器中即可

复制

@Service public class OrderService {      @Autowired     private RabbitUtil rabbitUtil;      /**      * 创建订单      * @param order      */     @Transactional     public void createOrder(Order order){         //1、创建订单         //2、调用库存接口,减库存         //3、向客户发放红包         rabbitUtil.convertAndSend("exchange.send.bonus", null, order);         //4、发短信通知         rabbitUtil.convertAndSend("exchange.sms.message", null, order);     }  }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

  • 14.

  • 15.

  • 16.

  • 17.

  • 18.

  • 19.

  • 20.

  • 21.

  • 监听发红包操作

复制

/**  * 监听发红包  * @param message  * @param channel  * @throws IOException  */ @RabbitListener(queues = "exchange.send.bonus") public void consume(Message message, Channel channel) throws IOException {     String msgJson = new String(message.getBody(),"UTF-8");     log.info("收到消息: {}", message.toString());      //调用发红包接口 }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

监听发短信操作

复制

/**  * 监听发短信  * @param message  * @param channel  * @throws IOException  */ @RabbitListener(queues = "exchange.sms.message") public void consume(Message message, Channel channel) throws IOException {     String msgJson = new String(message.getBody(),"UTF-8");     log.info("收到消息: {}", message.toString());      //调用发短信接口 }
  • 1.

  • 2.

  • 3.

  • 4.

  • 5.

  • 6.

  • 7.

  • 8.

  • 9.

  • 10.

  • 11.

  • 12.

  • 13.

既然 MQ 这么好用,那是不是完全可以将以前的业务也按照整个模型进行拆分呢?

答案显然不是!

当引入 MQ 之后业务的确是解耦了,但是当 MQ 一旦挂了,所有的服务基本都挂了,是不是很可怕!

但是没关系,俗话说,兵来将挡、水来土掩,这句话同样适用于 IT 开发者,有坑填坑!

在下篇文章中,我们会详细介绍 rabbitMQ 的集群搭建和部署,保证消息几乎 100% 的投递和消费。

四、总结

本篇主要围绕SpringBoot整合rabbitMQ做内容介绍,可能也有理解不到位的地方,欢迎网友批评指出!

 

26    2020-09-08 07:37:44    springBoot MQ rabbitMQ