/ * 创建定时任务线程工厂 */ private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("watchDog-").get(); / * 创建定时任务线程 */ private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(10,THREAD_FACTORY); / * 存放看门狗返回的线程对象 */ private static final ConcurrentHashMap
CONCURRENT_HASH_MAP = new ConcurrentHashMap<>(16); @GetMapping("/normal") public String normalRedisLock() throws InterruptedException { //给每一个线程都设置对应的UUID String productId = "product_huawei_p30"; String stock = "stock"; String clientId = UUID.randomUUID().toString(); try { //如果线程已经被加锁,死循环等待释放锁 while (true){ Integer stockNum = Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); if(stockNum <= 0){ return "商品已经卖完"; } //线程加锁,为10秒钟,设置为对应的客户端ID Boolean setIfAbsent = stringRedisTemplate.opsForValue().setIfAbsent(productId, clientId, 10, TimeUnit.SECONDS); if(Objects.nonNull(setIfAbsent) && setIfAbsent){ break; } } System.out.println("----------------------------------开始扣减库存----------------------------------"); / * 看门狗机制,目的是在线程业务处理时间过长时,导致锁被提前释放,导致处理完成时错误的释放掉另外线程的锁 */ WatchDogThread watchDogThread = new WatchDogThread(productId,clientId,stringRedisTemplate,CONCURRENT_HASH_MAP,SCHEDULED_EXECUTOR_SERVICE); ScheduledFuture
scheduledFuture = SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(watchDogThread, 1, 5, TimeUnit.SECONDS); / * 采用ConcurrentHaspMap用来存储,watchDog任务,并且停止指定的watchDog任务 */ CONCURRENT_HASH_MAP.put(clientId,scheduledFuture); //执行业务逻辑 int stockNum = Integer.parseInt(stringRedisTemplate.opsForValue().get(stock)); if(stockNum > 0){ /*System.out.println("模拟业务处理时间过长,看门狗续命机制....."); Thread.sleep(20000);*/ stringRedisTemplate.opsForValue().set(stock,String.valueOf(stockNum-1)); System.out.println("扣减库存成功库存数量为:"+stringRedisTemplate.opsForValue().get(stock)); }else { System.out.println("库存扣减失败。。。。"); } } catch (Exception e) { / * 抛出异常时,获取到对应客户端ID的看门狗线程,并且停止看门狗机制 */ ScheduledFuture scheduledFuture = CONCURRENT_HASH_MAP.get(clientId); if(scheduledFuture != null){ System.out.println("异常信息,移除看门狗线程。。。"); scheduledFuture.cancel(true); CONCURRENT_HASH_MAP.remove(clientId); } } finally { //释放锁 stringRedisTemplate.delete(productId); System.out.println("----------------------------------业务执行完成----------------------------------"); } return ""; }
WatchDog实现机制
public class WatchDogThread implements Runnable { private String productId; private String clientId; private StringRedisTemplate stringRedisTemplate; private ConcurrentHashMap
cacheMap; //获取到线程池的引用 private ScheduledExecutorService scheduledExecutorService; / * lua脚本,目的原子操作,获取到商品锁如果等于当前客户端ID,执行锁续命 */ private static final String SCRIPT = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " local ttl = tonumber(redis.call('ttl',KEYS[1]));" + " redis.call('expire',KEYS[1],ttl+ARGV[2]) return redis.call('ttl',KEYS[1]) end"; public WatchDogThread(String productId,String clientId, StringRedisTemplate stringRedisTemplate, ConcurrentHashMap
concurrentHashMap,ScheduledExecutorService scheduledExecutorService) { this.clientId = clientId; this.productId = productId; this.stringRedisTemplate = stringRedisTemplate; this.cacheMap = concurrentHashMap; this.scheduledExecutorService = scheduledExecutorService; } @Override public void run() { String lock = stringRedisTemplate.opsForValue().get(productId); try { //如果获取到锁为空,或者获取到的锁不等于当前客户端ID,那么就直接停止看门狗 if (StringUtils.isEmpty(lock) || !clientId.equals(lock)) { ScheduledFuture scheduledFuture = cacheMap.get(clientId); if (scheduledFuture != null) { System.out.println("库存扣减完成,关闭开门狗。。。"); scheduledFuture.cancel(true); cacheMap.remove(clientId); } return; } System.out.println("执行续命任务ID:"+lock); //执行lua脚本,用来原子性执行锁续命 stringRedisTemplate.execute(new DefaultRedisScript(SCRIPT, Long.class), Collections.singletonList(productId),clientId,"10"); Long expire = stringRedisTemplate.getExpire(productId, TimeUnit.SECONDS); System.out.println("续命后时间;"+expire); } catch (Exception e) { System.out.println("watchdog执行失败"+e.getMessage()); / * 如果watchDog执行续命任务出现异常,直接设置30秒过期时间,防止key值失效,导致误删 */ this.stringRedisTemplate.expire(productId,30,TimeUnit.SECONDS); /*WatchDogThread watchDogThread = new WatchDogThread(productId,clientId,stringRedisTemplate,this.cacheMap,this.scheduledExecutorService); this.scheduledExecutorService.scheduleAtFixedRate(watchDogThread, 1, 5, TimeUnit.SECONDS);*/ } } }
心得:
如果并发量过高,可以将商品库存进行拆分,如果是redis-cluster架构,可以通过一致性hash,将商品库存分配到不同的redis中进行存储,用来提高并发量。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/201238.html原文链接:https://javaforall.net
