SpringBoot结合RabbitMQ实现分布式事务之最大努力通知

如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息
首页 新闻资讯 行业资讯 SpringBoot结合RabbitMQ实现分布式事务之最大努力通知

环境:springboot.2.4.12 + RabbitMQ3.7.4

什么是最大努力通知

这是一个充值的案例

图片图片


交互流程 :

1、账户系统调用充值系统接口。

2、充值系统完成支付向账户系统发起充值结果通知 若通知失败,则充值系统按策略进行重复通知。

3、账户系统接收到充值结果通知修改充值状态。

4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。通过上边的例子我们总结最大努力通知方案的目标 :目标 :发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

具体包括 :

1、有一定的消息重复通知机制。因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

2、消息校对机制。如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

最大努力通知与可靠消息一致性有什么不同?

1、解决方案思想不同 可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。

2、两者的业务应用场景不同 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。

3、技术解决方向不同 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力地将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消费。

通过RabbitMQ实现最大努力通知

关于RabbitMQ相关文章《SpringBoot RabbitMQ消息可靠发送与接收 》,《RabbitMQ消息确认机制confirm 》。

  • 项目结构

图片图片


两个子模块users-mananger(账户模块),pay-manager(支付模块)

  • 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency>

子模块pay-manager

  • 配置文件

server:
  port:8080---spring:
  rabbitmq:
    host: localhost
    port:5672username: guest
    password: guest
    virtual-host:/publisherConfirmType: correlated
    publisherReturns:truelistener:simple:
        concurrency:5maxConcurrency:10prefetch:5acknowledgeMode: MANUAL
        retry:
          enabled:trueinitialInterval:3000maxAttempts:3defaultRequeueRejected:false
  • 实体类

记录充值金额及账户信息

@Entity@Table(name="t_pay_info")publicclass PayInfo implementsSerializable{@Idprivate Long id;private BigDecimal money;private Long accountId;}
  • DAO及Service

publicinterface PayInfoRepository extends JpaRepository<PayInfo,Long>{
 PayInfo findByOrderId(String orderId);}
@Servicepublicclass PayInfoService {@Resourceprivate PayInfoRepository payInfoRepository;@Resourceprivate RabbitTemplate rabbitTemplate;// 数据保存完后发送消息(这里发送消息可以应用确认模式或事物模式)@TransactionalpublicPayInfo savePayInfo(PayInfo payInfo){
    payInfo.setId(System.currentTimeMillis());PayInfo result=payInfoRepository.save(payInfo);CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString().replaceAll("-",""));try {
      rabbitTemplate.convertAndSend("pay-exchange","pay.#",new ObjectMapper().writeValueAsString(payInfo),correlationData);} catch(AmqpException|JsonProcessingException e){
      e.printStackTrace();}returnresult;}publicPayInfo queryByOrderId(String orderId){returnpayInfoRepository.findByOrderId(orderId);}
  
}

支付完成后发送消息。

  • Controller接口

@RestController@RequestMapping("/payInfos")publicclass PayInfoController {@Resourceprivate PayInfoService payInfoService;// 支付接口@PostMapping("/pay")publicObject pay(@RequestBodyPayInfo payInfo){
  payInfoService.savePayInfo(payInfo);return"支付已提交,等待结果";}@GetMapping("/queryPay")publicObject queryPay(String orderId){returnpayInfoService.queryByOrderId(orderId);}
  
}

子模块users-manager

  • 应用配置

server:
  port:8081---spring:
  rabbitmq:
    host: localhost
    port:5672username: guest
    password: guest
    virtual-host:/publisherConfirmType: correlated
    publisherReturns:truelistener:simple:
        concurrency:5maxConcurrency:10prefetch:5acknowledgeMode: MANUAL
        retry:
          enabled:trueinitialInterval:3000maxAttempts:3defaultRequeueRejected:false
  • 实体类

@Entity@Table(name="t_users")publicclass Users {@Idprivate Long id;private String name;private BigDecimal money;}

账户信息表

@Entity@Table(name="t_users_log")publicclass UsersLog {@Idprivate Long id;private String orderId;// 0:支付中,1:已支付,2:已取消@Column(columnDefinition="int default 0")privateIntegerstatus=0;private BigDecimal money;privateDatecreateTime;}

账户充值记录表(去重)

  • DAO及Service

publicinterface UsersRepository extends JpaRepository<Users,Long>{
}publicinterface UsersLogRepository extends JpaRepository<UsersLog,Long>{
 UsersLog findByOrderId(String orderId);}
  • Service类

@Servicepublicclass UsersService {@Resourceprivate UsersRepository usersRepository;@Resourceprivate UsersLogRepository usersLogRepository;@TransactionalpublicbooleanupdateMoneyAndLogStatus(Long id,String orderId){
  UsersLog usersLog=usersLogRepository.findByOrderId(orderId);if(usersLog!=null&&1==usersLog.getStatus()){
   throw new RuntimeException("已支付");}
  Users users=usersRepository.findById(id).orElse(null);if(users==null){
   throw new RuntimeException("账户不存在");}
  users.setMoney(users.getMoney().add(usersLog.getMoney()));usersRepository.save(users);usersLog.setStatus(1);usersLogRepository.save(usersLog);returntrue;}@TransactionalpublicbooleansaveLog(UsersLog usersLog){
  usersLog.setId(System.currentTimeMillis());usersLogRepository.save(usersLog);returntrue;}
}
  • 消息监听

@Componentpublicclass PayMessageListener {
  
 private static final Logger logger=LoggerFactory.getLogger(PayMessageListener.class);@Resourceprivate  UsersService usersService;@SuppressWarnings("unchecked")@RabbitListener(queues={"pay-queue"})@RabbitHandlerpublicvoid receive(Message message,Channel channel){
  long deliveryTag=message.getMessageProperties().getDeliveryTag();byte[]buf=null;try {
   buf=message.getBody();logger.info("接受到消息:{}",new String(buf,"UTF-8"));Map<String,Object>result=new JsonMapper().readValue(buf,Map.class);Long id=((Integer)result.get("accountId"))+0L;String orderId=(String)result.get("orderId");usersService.updateMoneyAndLogStatus(id,orderId);channel.basicAck(deliveryTag,true);} catch(Exception e){
   logger.error("消息接受出现异常:{}, 异常消息:{}",e.getMessage(),new String(buf,Charset.forName("UTF-8")));e.printStackTrace();try {// 应该将这类异常的消息放入死信队列中,以便人工排查。channel.basicReject(deliveryTag,false);} catch(IOException e1){
    logger.error("拒绝消息重入队列异常:{}",e1.getMessage());e1.printStackTrace();}
  }
 }
}
  • Controller接口

@RestController@RequestMapping("/users")publicclass UsersController {@Resourceprivate RestTemplate restTemplate;@Resourceprivate UsersService usersService;@PostMapping("/pay")publicObject pay(Long id,BigDecimal money)throws Exception {
    HttpHeaders headers=new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);String orderId=UUID.randomUUID().toString().replaceAll("-","");Map<String,String>params=new HashMap<>();params.put("accountId",String.valueOf(id));params.put("orderId",orderId);params.put("money",money.toString());UsersLog usersLog=new UsersLog();usersLog.setCreateTime(newDate());usersLog.setOrderId(orderId);usersLog.setMoney(money);usersLog.setStatus(0);usersService.saveLog(usersLog);HttpEntity<String>requestEntity=new HttpEntity<String>(new ObjectMapper().writeValueAsString(params),headers);returnrestTemplate.postForObject("http://localhost:8080/payInfos/pay",requestEntity,String.class);}
  
}

以上是两个子模块的所有代码了

测试

初始数据

图片图片


图片图片

账户子模块控制台

图片图片

支付子模块控制台

图片图片

数据表数据

图片图片

完毕!!!