秒杀中的分桶策略 —— 提高单台数据库的性能

语言: CN / TW / HK

theme: channing-cyan

背景

众所周知,数据库的单行并发写能力极为有限,比如 MySQL 的单行并发写大概在300~500TPS之间。所以,将数据分桶存储可以线性提升并发写入能力。分桶解决的是单个数据库的并发能力

分桶模型

每一个我们可以看成单个数据库中的一行记录,将原来的一行记录存储100件库存,变为用5行记录分别存储20件库存。在对库存进行操作的时候,就可以通过对用户ID取模,确定该用户操纵的是那一行记录,从而提高单个数据库的并发能力

秒杀场景中,我们要对库存的数量进行缓存,所以也要对缓存进行分桶。每一个桶看成Redis中的一个记录即可。

缓存中的数据相当于库存的预扣减,预扣减成功那么就让该请求去修改数据库,失败直接拒绝该请求即可。这里尽量保持缓存数据(弱一致)与数据库中的数据(强一致)的一致性,但是缓存和数据库分桶之间的关系是一定要保证的。

分桶设计与实现

分桶编排思路

在整个分桶编排的过程,有几个重要的点:

  1. 在进行分桶编排之前,要先暂停分桶服务,设置为维护状态,此时用户无法下单
  2. 暂停分桶服务时,必须使用独立事务手动提交,确保在继续执行分桶前,分桶状态已经提交到数据库;
  3. 分桶保存到数据库后,应同步数据到缓存中;
  4. 全量和增量:全量分桶意味着将当前传入的库存总量作为最终总量,重新计算分桶数据;而增量分桶则是将传入的库存总量累加到已有的库存中,然后再重新计算分桶数据;
  5. 有无历史分桶数据:如果此前已有分桶数据,那么在分桶时则要先进行库存回收,随后再统一分配;如果此前无分桶数据,则直接创建新的分桶集;
  6. 分桶中出现任何异常应抛出以触发事务回滚
  7. 无论分桶成功或失败,最后都要重新打开分桶服务,即取消分桶维护状态,否则秒杀品将无法售卖;

分桶编排代码实现

分桶编排代码

``` public void arrangeStockBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, Integer arrangementMode) { logger.info("arrangeBuckets|准备库存分桶|{},{},{}", itemId, totalStocksAmount, bucketsQuantity); if (itemId == null || totalStocksAmount == null || totalStocksAmount < 0 || bucketsQuantity == null || bucketsQuantity <= 0) { throw new StockBucketException(ErrorCode.INVALID_PARAMS); } // 保证只有一个线程对itemId进行更新 DistributedLock distributedLock = distributedLockFactoryService.getDistributedLock(ITEM_STOCK_BUCKETS_SUSPEND_KEY + itemId);

    try {
        boolean tryLock = distributedLock.tryLock(5, 5, TimeUnit.SECONDS);
        if (!tryLock) {
            logger.info("arrangeStockBuckets|获取锁失败|{}", itemId);
            return;
        }

        // 手动添加事务
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            // 设置为禁用状态
            logger.info("suspendBuckets|禁用库存分桶|{}", itemId);
            int updateStatusByItemId = seckillBucketMapper.updateStatusByItemId(itemId, SeckillBucketStatus.DISABLED.getCode());
            if (updateStatusByItemId < 0) {
                logger.info("arrangeBuckets|关闭库存分桶失败|{}", itemId);
                throw new StockBucketException(ErrorCode.ARRANGE_STOCK_BUCKETS_FAILED);
            }
            logger.info("suspendBuckets|库存分桶已禁用|{}", itemId);
            dataSourceTransactionManager.commit(transactionStatus);
        } catch (Exception e) {
            logger.info("arrangeBuckets|关闭分桶失败回滚中|{}", itemId, e);
            dataSourceTransactionManager.rollback(transactionStatus);
        }


        List<SeckillBucket> seckillBuckets = seckillBucketMapper.selectByItemId(itemId);
        if (seckillBuckets == null || seckillBuckets.size() == 0) {
            initStockBuckets(itemId, totalStocksAmount, bucketsQuantity);
            return;
        }

        // 根据总量分桶
        if (ArrangementMode.isTotalAmountMode(arrangementMode)) {
            arrangeStockBucketsBasedTotalMode(itemId, totalStocksAmount, bucketsQuantity, seckillBuckets);
        }

        // 根据增量分桶
        if (ArrangementMode.isIncrementalAmountMode(arrangementMode)) {
            rearrangeStockBucketsBasedIncrementalMode(itemId, totalStocksAmount, bucketsQuantity, seckillBuckets);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

```

构建(初始化)分桶代码:

``` private void initStockBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity) { SeckillBucket primaryBucket = new SeckillBucket() .initPrimary() .setItemId(itemId) .setTotalStocksAmount(totalStocksAmount); List presentBuckets = buildBuckets(itemId, totalStocksAmount, bucketsQuantity, primaryBucket); submitBucketsToArrange(itemId, presentBuckets); }

private List<SeckillBucket> buildBuckets(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, SeckillBucket primaryBucket) {
    if (itemId == null || totalStocksAmount == null || bucketsQuantity == null || bucketsQuantity <= 0) {
        throw new StockBucketException(ErrorCode.INVALID_PARAMS);
    }

    List<SeckillBucket> seckillBucketList = new ArrayList<>();
    Integer averageStockAmount = totalStocksAmount / bucketsQuantity;
    Integer remainStockAmount = totalStocksAmount % bucketsQuantity;
    for (int i = 0; i < bucketsQuantity; i++) {
        if (i == 0) {
            if (primaryBucket == null) {
                primaryBucket = new SeckillBucket();
            }
            primaryBucket
                    .setAvailableStocksAmount(averageStockAmount)
                    .setSerialNo(i)
                    .setStatus(SeckillBucketStatus.ENABLED.getCode());
            seckillBucketList.add(primaryBucket);
            continue;
        }
        SeckillBucket seckillBucket = new SeckillBucket()
                .setSerialNo(i)
                .setStatus(SeckillBucketStatus.ENABLED.getCode())
                .setItemId(itemId);

        if (i < bucketsQuantity - 1) {
            seckillBucket.setAvailableStocksAmount(averageStockAmount)
                    .setTotalStocksAmount(averageStockAmount);
        }

        if (i == bucketsQuantity - 1) {
            Integer restAvailableStocksAmount = averageStockAmount + remainStockAmount;
            seckillBucket.setAvailableStocksAmount(restAvailableStocksAmount)
                    .setTotalStocksAmount(restAvailableStocksAmount);

        }
        seckillBucketList.add(seckillBucket);
    }
    return seckillBucketList;
}

```

存储入缓存和数据库代码

先入数据库再入缓存。

private void submitBucketsToArrange(Long itemId, List<SeckillBucket> presentBuckets) { logger.info("arrangeBuckets|编排库存分桶|{},{}", itemId, JSON.toJSONString(presentBuckets)); if (itemId == null || itemId <= 0 || CollectionUtils.isEmpty(presentBuckets)) { logger.info("arrangeBuckets|库存分桶参数错误|{}", itemId); throw new BusinessException(ErrorCode.INVALID_PARAMS); } // 先删除再加入 seckillBucketMapper.deleteById(itemId); int insertBatch = seckillBucketMapper.insertBatch(presentBuckets); if (insertBatch > 1) { // 存入缓存 presentBuckets.forEach((seckillBucket -> { distributedCacheService.put(getBucketAvailableStocksCacheKey(itemId, seckillBucket.getSerialNo()), seckillBucket.getAvailableStocksAmount()); distributedCacheService.put(getItemStockBucketsQuantityCacheKey(itemId), presentBuckets.size()); })); } else { logger.info("submitBucketsToArrange|库存分桶错误|{}, {}", itemId, JSON.toJSONString(presentBuckets)); throw new StockBucketException(ErrorCode.ARRANGE_STOCK_BUCKETS_FAILED); } }

根据全量分桶

``` private void arrangeStockBucketsBasedTotalMode(Long itemId, Integer totalStocksAmount, Integer bucketsQuantity, List existingBuckets) { // 计算子桶的剩余的库存数 int remainAvailableStocks = existingBuckets.stream() .filter(SeckillBucket::isSubSeckillBucket) .mapToInt(SeckillBucket::getAvailableStocksAmount).sum(); Optional optionalSeckillBucket = existingBuckets.stream().filter(SeckillBucket::isPrimarySeckillBucket).findFirst(); if (!optionalSeckillBucket.isPresent()) { throw new StockBucketException(ErrorCode.PRIMARY_BUCKET_IS_MISSING); }

    // 回收分桶库存到主桶
    SeckillBucket primarySeckillBucket = optionalSeckillBucket.get();
    primarySeckillBucket.addAvailableStocks(remainAvailableStocks);
    // 已售出的库存
    int soldStocksAmount = primarySeckillBucket.getTotalStocksAmount() - primarySeckillBucket.getAvailableStocksAmount();

    if (soldStocksAmount > totalStocksAmount) {
        throw new StockBucketException(799, "已售库存大于当期所设库存总量!");
    }

    // 设置最新库存,重新分桶
    primarySeckillBucket.setTotalStocksAmount(totalStocksAmount);
    List<SeckillBucket> seckillBucketList = buildBuckets(itemId, totalStocksAmount, bucketsQuantity, primarySeckillBucket);
    submitBucketsToArrange(itemId, seckillBucketList);
}

```

根据增量分桶

``` private void rearrangeStockBucketsBasedIncrementalMode(Long itemId, Integer incrementalStocksAmount, Integer bucketsQuantity, List existingBuckets) { Optional optionalSeckillBucket = existingBuckets.stream().filter(SeckillBucket::isPrimarySeckillBucket).findFirst(); if (!optionalSeckillBucket.isPresent()) { throw new StockBucketException(ErrorCode.PRIMARY_BUCKET_IS_MISSING); }

    // 回收分桶库存 (获取当前所有桶剩余的可用库存数)
    int remainAvailableStocks = existingBuckets.stream().mapToInt(SeckillBucket::getAvailableStocksAmount).sum();

    // 加上要添加的库存数
    Integer totalAvailableStocks = remainAvailableStocks + incrementalStocksAmount;
    int presentAvailableStocks = remainAvailableStocks + incrementalStocksAmount;

    if (presentAvailableStocks < 0) {
        throw new StockBucketException(ErrorCode.STOCK_NOT_ENOUGH);
    }

    SeckillBucket primarySeckillBucket = optionalSeckillBucket.get();
    primarySeckillBucket.increaseTotalStocksAmount(incrementalStocksAmount);

    List<SeckillBucket> seckillBucketList = buildBuckets(itemId, totalAvailableStocks, bucketsQuantity, primarySeckillBucket);
    submitBucketsToArrange(itemId, seckillBucketList);
}

```

不同分桶之间的数量差异

存在问题

用户在访问可用库存的时候,会存在一个问题:路由到不同分桶的流量可能存在差异和不均,这会导致不同分桶的余量不同,展示到不同用户上的数量就会不同。例如:#1桶中库存为0,但#2桶中库存大于0。

解决方法

  1. 设计库存借用机制,当某个分桶库存不足时,可以从其他桶借库存;
  2. 主桶和分桶留有一定冗余库存,分桶库存不足时可以向主桶申请;
  3. 允许不同用户看到不同的库存余量,所路由到的分桶没有库存时直接展示无库存;

在秒杀场景中,我们一般选择第三种,因为它足够的简单高效,重点维护服务端的数据一致性极致的性能。前面两种方法会大大增加系统的复杂度,在选择的时候要慎重考虑

扣减库存实现代码

先扣除缓存在扣减数据库,缓存充当一个预扣减的作用,这里不再详细讨论。参考文章http://juejin.cn/post/7185205290278027319

扣减缓存库存的Lua脚本

``` --- 对应的库存键不存在 if (redis.call('exists', KEYS[1]) == 0) then return -996 end --- 分桶禁用锁 if (redis.call('exists', KEYS[2]) == 1) then return -998 end --- 库存调度锁 if (redis.call('exists', KEYS[3]) == 1) then return -997 end if (redis.call('exists', KEYS[1]) == 1) then local stocksAmount = tonumber(redis.call('get', KEYS[1])) local quantity = tonumber(ARGV[1]) --- 库存不够 if (stocksAmount < quantity) then return -1 end

if (stocksAmount >= quantity) then
    redis.call('incrby', KEYS[1], 0 - quantity)
    return 1
end

end

return -10000 ```

使用乐观锁扣减数据库库存

<update id="decreaseBucketStock"> update seckill_bucket set available_stocks_amount = available_stocks_amount - #{quantity,jdbcType=NUMERIC} where item_id = #{itemId,jdbcType=NUMERIC} AND serial_no = #{serialNo,jdbcType=NUMERIC} AND available_stocks_amount = #{oldAvailableStocksAmount,jdbcType=NUMERIC} AND available_stocks_amount <![CDATA[ >= ]]> #{quantity,jdbcType=NUMERIC} </update>