Redis

关注公众号 jb51net

关闭
首页 > 数据库 > Redis > Redisson分布式锁订单服务

RabbitMQ+redis+Redisson分布式锁+seata实现订单服务的流程分析

作者:二价亚铁

订单服务涉及许多方面,分布式事务,分布式锁,例如订单超时未支付要取消订单,订单如何防止重复提交,如何防止超卖、这里都会使用到,这篇文章主要介绍了RabbitMQ+redis+Redisson分布式锁+seata实现订单服务的流程分析,需要的朋友可以参考下

引言

订单服务涉及许多方面,分布式事务,分布式锁,例如订单超时未支付要取消订单,订单如何防止重复提交,如何防止超卖、这里都会使用到。

订单流程(只展示重要的内容,具体可以到源码查看)

订单确认

public OrderConfirmVO confirmOrder(Long skuId) {
Long memberId = SecurityUtils.getMemberId();
// 解决子线程无法获取HttpServletRequest请求对象中数据的问题,子线程指getOrderItemsFuture,getMemberAddressFuture,generateOrderTokenFuture
//feign远程调用会被拦截提取attributes并转发
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
RequestContextHolder.setRequestAttributes(attributes, true);
// 获取订单商品
//使用了 CompletableFuture 来实现异步执行获取订单项的操作
CompletableFuture<List<OrderItemDTO>> getOrderItemsFuture = CompletableFuture.supplyAsync(
	() -> this.getOrderItems(skuId, memberId), threadPoolExecutor)
.exceptionally(ex -> {
	log.error("Failed to get order items: {}", ex.toString());
	return null;
});
// 用户收货地址
CompletableFuture<List<MemberAddressDTO>> getMemberAddressFuture = CompletableFuture.supplyAsync(() -> {
	Result<List<MemberAddressDTO>> getMemberAddressResult = memberFeignClient.listMemberAddresses(memberId);
	if (Result.isSuccess(getMemberAddressResult)) {
		return getMemberAddressResult.getData();
	}
	return null;
}, threadPoolExecutor).exceptionally(ex -> {
	log.error("Failed to get addresses for memberId {} : {}", memberId, ex.toString());
	return null;
});
// 生成唯一令牌,防止重复提交(原理:提交会消耗令牌,令牌被消耗无法再次提交)
CompletableFuture<String> generateOrderTokenFuture = CompletableFuture.supplyAsync(() -> {
	String orderToken = this.generateTradeNo(memberId);
	redisTemplate.opsForValue().set(OrderConstants.ORDER_TOKEN_PREFIX + orderToken, orderToken);
	return orderToken;
}, threadPoolExecutor).exceptionally(ex -> {
	log.error("Failed to generate order token .");
	return null;
});
//CompletableFuture.allOf 方法,可以等待所有 CompletableFuture 对象都完成再进行后续操作,
// 确保获取和设置属性的操作都能够成功执行。这样可以避免程序出现异常,
CompletableFuture.allOf(getOrderItemsFuture, getMemberAddressFuture, generateOrderTokenFuture).join();
OrderConfirmVO orderConfirmVO = new OrderConfirmVO();
orderConfirmVO.setOrderItems(getOrderItemsFuture.join());
orderConfirmVO.setAddresses(getMemberAddressFuture.join());
orderConfirmVO.setOrderToken(generateOrderTokenFuture.join());
log.info("Order confirm response for skuId {}: {}", skuId, orderConfirmVO);
return orderConfirmVO;
}

防止订单重复提交

通过生成唯一令牌解决,方法:

  private String generateTradeNo(Long memberId) {
  //当 memberId 的位数小于 5 位时,使用 0 来填充位数不足的部分,如果 memberId 已经是 5 位数或更长,则不进行填充
  String userIdFilledZero = String.format("%05d", memberId);
  //超出五位的保留后五位
  String fiveDigitsUserId = userIdFilledZero.substring(userIdFilledZero.length() - 5);
  // 在前面加上wxo(wx order)等前缀是为了人工可以快速分辨订单号是下单还是退款、来自哪家支付机构等
  // 将时间戳+3位随机数+五位id组成商户订单号,规则参考自<a href="https://tech.meituan.com/2016/11/18/dianping-order-db-sharding.html" rel="external nofollow" >大众点评</a>
  return System.currentTimeMillis() + RandomUtil.randomNumbers(3) + fiveDigitsUserId;
  }
  /**
   * 自定义订单线程池
   *
   */
  @Configuration
  @Slf4j
  public class ThreadPoolConfig {
  	@Bean
  	public ThreadPoolExecutor threadPoolExecutor() {
  		int cpuCoreSize = Runtime.getRuntime().availableProcessors();//使用 Java 获取可用的 CPU 核心数
  		log.info("当前CPU核心数:{}", cpuCoreSize);
  		/*
  		 * 计算密集型: 核心线程数=CPU核心 +1   √
  		 * I/O密集型: 核心线程数=2*CPU核心 +1
  		 */
  		int corePoolSize = cpuCoreSize + 1;
  		return new ThreadPoolExecutor(
  			 //核心线程数
  				corePoolSize,
  				//最大线程数
  				2 * corePoolSize,
  				//线程空闲(存活)时间;当线程数超过核心线程数,并且空闲时间超过指定时间时,多余的线程会被销毁
  				30,
  				TimeUnit.SECONDS,
  				//任务队列(选取数组阻塞队列)特点:固定容量,公平性,先进先出
  				new ArrayBlockingQueue<>(1000),
  				//线程工厂
  				new NamedThreadFactory("order") // 订单线程
  		);
  	}
  }

订单提交

@GlobalTransactional
public String submitOrder(OrderSubmitForm submitForm) {
log.info("订单提交参数:{}", JSONUtil.toJsonStr(submitForm));
String orderToken = submitForm.getOrderToken();
// 1. 判断订单是否重复提交(LUA脚本保证获取和删除的原子性,成功返回1,否则返回0)
//KEYS[1]指OrderConstants.ORDER_TOKEN_PREFIX + orderToken,ARGV[1]指orderToken
String lockAcquireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//这一行代码使用RedisTemplate的execute方法执行Lua脚本。通过new DefaultRedisScript<>创建一个Redis脚本对象,并指定脚本字符串和返回结果的类型(Long)。
// 然后使用Collections.singletonList来创建包含锁键名和订单令牌参数的列表,传递给execute方法。
//执行 Redis 脚本后,结果会赋值给 lockAcquired 变量,它的类型是 Long。
// lockAcquired这个值表示获取锁的结果,如果成功获取并删除了锁,则为 1;如果获取锁失败,则为 0。
Long lockAcquired = this.redisTemplate.execute(
	new DefaultRedisScript<>(lockAcquireScript, Long.class),
	Collections.singletonList(OrderConstants.ORDER_TOKEN_PREFIX + orderToken),
orderToken
);
Assert.isTrue(lockAcquired != null && lockAcquired.equals(1L), "订单重复提交,请刷新页面后重试");
// 2. 订单商品校验 (PS:校验进入订单确认页面到提交过程商品(价格、上架状态)变化)
List<OrderSubmitForm.OrderItem> orderItems = submitForm.getOrderItems();
List<Long> skuIds = orderItems.stream()
.map(OrderSubmitForm.OrderItem::getSkuId)
.collect(Collectors.toList());
List<SkuInfoDTO> skuList = skuFeignClient.getSkuInfoList(skuIds);
for (OrderSubmitForm.OrderItem item : orderItems) {
	SkuInfoDTO skuInfo = skuList.stream().filter(sku -> sku.getId().equals(item.getSkuId()))
	.findFirst()
	.orElse(null);
	Assert.isTrue(skuInfo != null, "商品({})已下架或删除");
	//如果调用对象小于被比较对象,compareTo 方法返回一个负整数。
	// 如果调用对象大于被比较对象,compareTo 方法返回一个正整数。
	Assert.isTrue(item.getPrice().compareTo(skuInfo.getPrice()) == 0, "商品({})价格发生变动,请刷新页面", item.getSkuName());
}
// 3. 校验库存并锁定库存
List<LockedSkuDTO> lockedSkuList = orderItems.stream()
.map(item -> new LockedSkuDTO(item.getSkuId(), item.getQuantity(), item.getSkuSn()))
.collect(Collectors.toList());
boolean lockStockResult = skuFeignClient.lockStock(orderToken, lockedSkuList);
Assert.isTrue(lockStockResult, "订单提交失败:锁定商品库存失败!");
// 4. 生成订单
boolean result = this.saveOrder(submitForm);
log.info("order ({}) create result:{}", orderToken, result);
return orderToken;
}

前提:此处涉及seata分布式事务和Redisson实现的分布式锁

lua脚本

KEYS[1] 指OrderConstants.ORDER_TOKEN_PREFIX + orderToken

ARGV[1] 指orderToken

判断是否含有这个锁key,有就删除这个锁并返回1L,没有就返回0

通过断言判断lockAcquired是否成功获取并删除了锁,不成功会报错

校验商品

3. 锁定库存方法

@Transactional
public boolean lockStock(String orderToken, List<LockedSkuDTO> lockedSkuList) {
	Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkuList), "订单({})未包含任何商品", orderToken);
	// 校验库存数量是否足够以及锁定库存
	for (LockedSkuDTO lockedSku : lockedSkuList) {
		Long skuId = lockedSku.getSkuId();
		//商品分布式锁缓存键前缀:ProductConstants.SKU_LOCK_PREFIX
		//每次getLock都会返回一个独立的分布式锁对象,但它们共享一个锁资源。
		RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + skuId);  // 构建商品锁对象
		try {
			//共享一个锁资源意味着多个分布式锁对象共同使用同一个锁来实现互斥访问。
			//虽然每个分布式锁对象是独立创建的,但它们会使用相同的锁资源来进行加锁和释放锁的操作。
			lock.lock();//锁定操作
			Integer quantity = lockedSku.getQuantity(); // 订单的商品数量
			// 库存足够
			boolean lockResult = this.update(new LambdaUpdateWrapper<PmsSku>()
											 .setSql("locked_stock = locked_stock + " + quantity) // 修改锁定商品数
											 .eq(PmsSku::getId, lockedSku.getSkuId())
											 //通过 apply 方法添加动态 SQL 条件,确保 stock 减去 locked_stock 的值大于等于给定的 quantity。
											 //使用了占位符 {0} 来引用 quantity,0表示第一个传入的值。
											 .apply("stock - locked_stock >= {0}", quantity) // 剩余商品数 ≥ 订单商品数
											);
			Assert.isTrue(lockResult, "商品({})库存不足", lockedSku.getSkuSn());
		} finally {
			if (lock.isLocked()) {
				lock.unlock();
			}
		}
	}
	// 锁定的商品缓存至 Redis (后续使用:1.取消订单解锁库存;2:支付订单扣减库存)
	redisTemplate.opsForValue().set(ProductConstants.LOCKED_SKUS_PREFIX + orderToken, lockedSkuList);
	return true;
}

扩展:解锁库存

/**
*解锁库存
*<P>
*订单超时未支付,释放锁定的商品库存
*/
public boolean unlockStock(String orderSn) {
//锁定库存对象集合:lockedSkus
	List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
	log.info("释放订单({})锁定的商品库存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus));
	// 库存已释放
	if (CollectionUtil.isEmpty(lockedSkus)) {
		return true;
	}
	// 遍历恢复锁定的商品库存
	for (LockedSkuDTO lockedSku : lockedSkus) {
		RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId());  // 获取商品分布式锁
		try {
			lock.lock();
			this.update(new LambdaUpdateWrapper<PmsSku>()
						.setSql("locked_stock = locked_stock - " + lockedSku.getQuantity())
						.eq(PmsSku::getId, lockedSku.getSkuId())
					   );
		} finally {
			//判断当前分布式锁是否已被锁定,通过这个判断可以防止非法释放锁等潜在问题
			if (lock.isLocked()) {
				lock.unlock();
			}
		}
	}
	// 移除 redis 订单锁定的商品
	redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
	return true;
}

扩展:扣减库存

   /**
	 * 扣减库存
	 * <p>
	 * 订单支付扣减商品库存和释放锁定库存
	 *
	 * @param orderSn 订单编号
	 * @return ture/false
	 */
public boolean deductStock(String orderSn) {
	// 获取订单提交时锁定的商品
	List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
	log.info("订单({})支付成功,扣减订单商品库存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus));
	Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkus), "扣减商品库存失败:订单({})未包含商品");
	for (LockedSkuDTO lockedSku : lockedSkus) {
		RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId());    // 获取商品分布式锁
		try {
			lock.lock();
			this.update(new LambdaUpdateWrapper<PmsSku>()
					.setSql("stock = stock - " + lockedSku.getQuantity())
					.setSql("locked_stock = locked_stock - " + lockedSku.getQuantity())
					.eq(PmsSku::getId, lockedSku.getSkuId())
			);
		} finally {
			if (lock.isLocked()) {
				lock.unlock();
			}
		}
	}
	// 移除订单锁定的商品
	redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn);
	return true;
}

4. 生成订单

创建订单

/**
 * 创建订单
 *
 * @param submitForm 订单提交表单对象
 * @return
 */
private boolean saveOrder(OrderSubmitForm submitForm) {
//创建订单详情表(OmsOrder)对象
OmsOrder order = orderConverter.form2Entity(submitForm);
//设置待支付状态
order.setStatus(OrderStatusEnum.UNPAID.getValue());
//设置订单会员id
order.setMemberId(SecurityUtils.getMemberId());
//设置订单来源(0代表PC订单,1代表APP订单)
order.setSource(submitForm.getOrderSource().getValue());
//保存到数据库
boolean result = this.save(order);
Long orderId = order.getId();
if (result) {
	// 保存订单明细
	List<OmsOrderItem> orderItemEntities = orderItemConverter.item2Entity(submitForm.getOrderItems());
	orderItemEntities.forEach(item -> item.setOrderId(orderId));
	orderItemService.saveBatch(orderItemEntities);
	// 订单超时未支付取消
	//这行代码使用 RabbitMQ 的 Java 客户端库来发送一条消息到 order.exchange 交换器,
	// 该消息会被路由到 order.close.delay 队列中。消息的内容是 submitForm.getOrderToken() 方法的返回结果。
	rabbitTemplate.convertAndSend("order.exchange", "order.close.delay", submitForm.getOrderToken());
}
return result;
}

普通的保存到数据的操作就不做解释了,主要看订单超时未支付取消的功能。

订单超时未支付取消功能通过rabbitMQ实现

订单超时关单延时队列

@Component
@Slf4j
public class OrderRabbitConfig {
	// 普通延迟队列
	private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue";
	private static final String ORDER_EXCHANGE = "order.exchange";
	private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay";
	// 死信关单队列
	private static final String ORDER_ClOSE_QUEUE = "order.close.queue";
	private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange";
	private static final String ORDER_ClOSE_ROUTING_KEY = "order.close";
	/**
	 * 定义交换机
	 */
	@Bean
	public Exchange orderExchange() {
		return new DirectExchange(ORDER_EXCHANGE, true, false);
	}
	/**
	 * 死信交换机
	 */
	@Bean
	public Exchange orderDlxExchange() {
		return new DirectExchange(ORDER_DLX_EXCHANGE, true, false);
	}
	/**
	 * 延时队列
	 */
	@Bean
	public Queue orderDelayQueue() {
		// 延时队列的消息过期了,会自动触发消息的转发,根据routingKey发送到指定的exchange中,exchange路由到死信队列
		Map<String, Object> args = new HashMap<>();
		args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE);
		args.put("x-dead-letter-routing-key", ORDER_ClOSE_ROUTING_KEY); // 死信路由Key
		args.put("x-message-ttl", 10 * 1000L); // 单位毫秒,10s用于测试
		return new Queue(ORDER_CLOSE_DELAY_QUEUE, true, false, false, args);
	}
	/**
	 * 延时队列绑定交换机
	 */
	@Bean
	public Binding orderDelayQueueBinding() {
		return new Binding(ORDER_CLOSE_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EXCHANGE,
						   ORDER_CLOSE_DELAY_ROUTING_KEY, null);
	}
	/**
	 * 关单队列
	 */
	@Bean
	public Queue orderCloseQueue() {
		log.info("死信队列(order.close.queue)创建");
		return new Queue(ORDER_ClOSE_QUEUE, true, false, false);
	}
	/**
	 * 关单队列绑定死信交换机
	 */
	@Bean
	public Binding orderCloseQueueBinding() {
		return new Binding(ORDER_ClOSE_QUEUE, Binding.DestinationType.QUEUE, ORDER_DLX_EXCHANGE,
						   ORDER_ClOSE_ROUTING_KEY, null);
	}
}

订单超时未支付系统自动取消监听器

/**
 * 订单超时未支付系统自动取消监听器
 *
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCloseListener {
	private final OrderService orderService;
	private final RabbitTemplate rabbitTemplate;
	@RabbitListener(queues = "order.close.queue")
	public void closeOrder(String orderSn, Message message, Channel channel) {
		long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消息序号(消息队列中的位置)
		log.info("订单({})超时未支付,系统自动关闭订单", orderSn);
		try {
			boolean closeOrderResult = orderService.closeOrder(orderSn);
			log.info("关单结果:{}", closeOrderResult);
			if (closeOrderResult) {
				// 关单成功:释放库存
				//发送订单号
				rabbitTemplate.convertAndSend("stock.exchange", "stock.unlock", orderSn);
			} else {
				// 关单失败:订单已被关闭,手动ACK确认并从队列移除消息
				channel.basicAck(deliveryTag, false); // false: 不批量确认,仅确认当前单个消息
			}
		} catch (Exception e) {
			// 关单异常:拒绝消息并重新入队
			try {
				channel.basicReject(deliveryTag, true); //  true: 重新放回队列
				// channel.basicReject(deliveryTag, false); // false: 直接丢弃消息 (TODO 定时任务补偿)
			} catch (IOException ex) {
				log.error("订单({})关闭失败,原因:{}", orderSn, ex.getMessage());
			}
		}
	}
}
  public boolean closeOrder(String orderSn) {
  return this.update(new LambdaUpdateWrapper<OmsOrder>()
  				   .eq(OmsOrder::getOrderSn, orderSn)
  				   .eq(OmsOrder::getStatus, OrderStatusEnum.UNPAID.getValue())
  				   .set(OmsOrder::getStatus, OrderStatusEnum.CANCELED.getValue())
  				  );
  }
  /**
  *库存释放监听器
  */
  @Component
  @Slf4j
  @RequiredArgsConstructor
  public class StockReleaseListener {
  	private final SkuService skuService;
  	private static final String STOCK_UNLOCK_QUEUE = "stock.unlock.queue";
  	private static final String STOCK_EXCHANGE = "stock.exchange";
  	private static final String STOCK_UNLOCK_ROUTING_KEY = "stock.unlock";
  	@RabbitListener(bindings =
  					@QueueBinding(
  						value = @Queue(value = STOCK_UNLOCK_QUEUE,durable = "true"),
  						exchange = @Exchange(value = STOCK_EXCHANGE),
  						key = {STOCK_UNLOCK_ROUTING_KEY}
  					),
  					ackMode = "MANUAL")//手动ACK
  	@RabbitHandler
  	public void UnlockStock(String orderSn, Message message, Channel channel){
  		log.info("订单{}取消释放库存",orderSn);
  		long deliverTag=message.getMessageProperties().getDeliveryTag();
  		try{
  			skuService.unlockStock(orderSn);
  			channel.basicAck(deliverTag,false);
  		}catch (Exception e){
  			try {
  				channel.basicAck(deliverTag,true);
  			}catch (IOException ex){
  				log.error("订单{}关闭失败,原因:{}",orderSn,ex.getMessage());
  			}
  		}
  	}
  }

订单支付

@GlobalTransactional
public <T> T payOrder(OrderPaymentForm paymentForm) {
	String orderSn = paymentForm.getOrderSn();
	OmsOrder order = this.getOne(new LambdaQueryWrapper<OmsOrder>().eq(OmsOrder::getOrderSn, orderSn));
	Assert.isTrue(order != null, "订单不存在");
	Assert.isTrue(OrderStatusEnum.UNPAID.getValue().equals(order.getStatus()), "订单不可支付,请检查订单状态");
	RLock lock = redissonClient.getLock(OrderConstants.ORDER_LOCK_PREFIX + order.getOrderSn());
	try {
		lock.lock();
		T result;
		switch (paymentForm.getPaymentMethod()) {
			case WX_JSAPI:
				result = (T) wxJsapiPay(paymentForm.getAppId(), order.getOrderSn(), order.getPaymentAmount());
				break;
			default:
				result = (T) balancePay(order);
				break;
		}
		return result;
	} finally {
		//释放锁
		if (lock.isLocked()) {
			lock.unlock();
		}
	}
}

代码摘自

youlai-mall: 🚀基于 Spring Boot 3、Spring Cloud & Alibaba 2022、SAS OAuth2 、Vue3、Element-Plus、uni-app 构建的开源全栈商城。 (gitee.com)

到此这篇关于RabbitMQ+redis+Redisson分布式锁+seata实现订单服务的文章就介绍到这了,更多相关Redisson分布式锁订单服务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

阅读全文