环境:springboot.2.4.12 + RabbitMQ3.7.4
这是一个充值的案例
图片
交互流程 :
1、账户系统调用充值系统接口。
2、充值系统完成支付向账户系统发起充值结果通知 若通知失败,则充值系统按策略进行重复通知。
3、账户系统接收到充值结果通知修改充值状态。
4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。通过上边的例子我们总结最大努力通知方案的目标 :目标 :发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。
具体包括 :
1、有一定的消息重复通知机制。因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
2、消息校对机制。如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。
1、解决方案思想不同 可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
2、两者的业务应用场景不同 可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3、技术解决方向不同 可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力地将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消费。
关于RabbitMQ相关文章《SpringBoot RabbitMQ消息可靠发送与接收 》,《RabbitMQ消息确认机制confirm 》。
项目结构
图片
两个子模块users-mananger(账户模块),pay-manager(支付模块)
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency>
配置文件
server: port:8080---spring: rabbitmq: host: localhost port:5672username: guest password: guest virtual-host:/publisherConfirmType: correlated publisherReturns:truelistener:simple: concurrency:5maxConcurrency:10prefetch:5acknowledgeMode: MANUAL retry: enabled:trueinitialInterval:3000maxAttempts:3defaultRequeueRejected:false
实体类
记录充值金额及账户信息
@Entity@Table(name="t_pay_info")publicclass PayInfo implementsSerializable{@Idprivate Long id;private BigDecimal money;private Long accountId;}
DAO及Service
publicinterface PayInfoRepository extends JpaRepository<PayInfo,Long>{ PayInfo findByOrderId(String orderId);}
@Servicepublicclass PayInfoService {@Resourceprivate PayInfoRepository payInfoRepository;@Resourceprivate RabbitTemplate rabbitTemplate;// 数据保存完后发送消息(这里发送消息可以应用确认模式或事物模式)@TransactionalpublicPayInfo savePayInfo(PayInfo payInfo){ payInfo.setId(System.currentTimeMillis());PayInfo result=payInfoRepository.save(payInfo);CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString().replaceAll("-",""));try { rabbitTemplate.convertAndSend("pay-exchange","pay.#",new ObjectMapper().writeValueAsString(payInfo),correlationData);} catch(AmqpException|JsonProcessingException e){ e.printStackTrace();}returnresult;}publicPayInfo queryByOrderId(String orderId){returnpayInfoRepository.findByOrderId(orderId);} }
支付完成后发送消息。
Controller接口
@RestController@RequestMapping("/payInfos")publicclass PayInfoController {@Resourceprivate PayInfoService payInfoService;// 支付接口@PostMapping("/pay")publicObject pay(@RequestBodyPayInfo payInfo){ payInfoService.savePayInfo(payInfo);return"支付已提交,等待结果";}@GetMapping("/queryPay")publicObject queryPay(String orderId){returnpayInfoService.queryByOrderId(orderId);} }
应用配置
server: port:8081---spring: rabbitmq: host: localhost port:5672username: guest password: guest virtual-host:/publisherConfirmType: correlated publisherReturns:truelistener:simple: concurrency:5maxConcurrency:10prefetch:5acknowledgeMode: MANUAL retry: enabled:trueinitialInterval:3000maxAttempts:3defaultRequeueRejected:false
实体类
@Entity@Table(name="t_users")publicclass Users {@Idprivate Long id;private String name;private BigDecimal money;}
账户信息表
@Entity@Table(name="t_users_log")publicclass UsersLog {@Idprivate Long id;private String orderId;// 0:支付中,1:已支付,2:已取消@Column(columnDefinition="int default 0")privateIntegerstatus=0;private BigDecimal money;privateDatecreateTime;}
账户充值记录表(去重)
DAO及Service
publicinterface UsersRepository extends JpaRepository<Users,Long>{ }publicinterface UsersLogRepository extends JpaRepository<UsersLog,Long>{ UsersLog findByOrderId(String orderId);}
Service类
@Servicepublicclass UsersService {@Resourceprivate UsersRepository usersRepository;@Resourceprivate UsersLogRepository usersLogRepository;@TransactionalpublicbooleanupdateMoneyAndLogStatus(Long id,String orderId){ UsersLog usersLog=usersLogRepository.findByOrderId(orderId);if(usersLog!=null&&1==usersLog.getStatus()){ throw new RuntimeException("已支付");} Users users=usersRepository.findById(id).orElse(null);if(users==null){ throw new RuntimeException("账户不存在");} users.setMoney(users.getMoney().add(usersLog.getMoney()));usersRepository.save(users);usersLog.setStatus(1);usersLogRepository.save(usersLog);returntrue;}@TransactionalpublicbooleansaveLog(UsersLog usersLog){ usersLog.setId(System.currentTimeMillis());usersLogRepository.save(usersLog);returntrue;} }
消息监听
@Componentpublicclass PayMessageListener { private static final Logger logger=LoggerFactory.getLogger(PayMessageListener.class);@Resourceprivate UsersService usersService;@SuppressWarnings("unchecked")@RabbitListener(queues={"pay-queue"})@RabbitHandlerpublicvoid receive(Message message,Channel channel){ long deliveryTag=message.getMessageProperties().getDeliveryTag();byte[]buf=null;try { buf=message.getBody();logger.info("接受到消息:{}",new String(buf,"UTF-8"));Map<String,Object>result=new JsonMapper().readValue(buf,Map.class);Long id=((Integer)result.get("accountId"))+0L;String orderId=(String)result.get("orderId");usersService.updateMoneyAndLogStatus(id,orderId);channel.basicAck(deliveryTag,true);} catch(Exception e){ logger.error("消息接受出现异常:{}, 异常消息:{}",e.getMessage(),new String(buf,Charset.forName("UTF-8")));e.printStackTrace();try {// 应该将这类异常的消息放入死信队列中,以便人工排查。channel.basicReject(deliveryTag,false);} catch(IOException e1){ logger.error("拒绝消息重入队列异常:{}",e1.getMessage());e1.printStackTrace();} } } }
Controller接口
@RestController@RequestMapping("/users")publicclass UsersController {@Resourceprivate RestTemplate restTemplate;@Resourceprivate UsersService usersService;@PostMapping("/pay")publicObject pay(Long id,BigDecimal money)throws Exception { HttpHeaders headers=new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);String orderId=UUID.randomUUID().toString().replaceAll("-","");Map<String,String>params=new HashMap<>();params.put("accountId",String.valueOf(id));params.put("orderId",orderId);params.put("money",money.toString());UsersLog usersLog=new UsersLog();usersLog.setCreateTime(newDate());usersLog.setOrderId(orderId);usersLog.setMoney(money);usersLog.setStatus(0);usersService.saveLog(usersLog);HttpEntity<String>requestEntity=new HttpEntity<String>(new ObjectMapper().writeValueAsString(params),headers);returnrestTemplate.postForObject("http://localhost:8080/payInfos/pay",requestEntity,String.class);} }
以上是两个子模块的所有代码了
初始数据
图片
图片
账户子模块控制台
图片
支付子模块控制台
图片
数据表数据
图片
完毕!!!