redis实现延迟队列

redis实现延迟队列前言 redis 实现延迟队列该怎么做 在这里我分享一下 redis 实现延迟队列一 Redis 实现延迟队列二 redis 失效监听事件三 此种实现面临的问题四 开发准备五 基础实现六 使用 redisson 实现延迟队列七 redisson 实现延迟队列的原理八 延迟队列配置一 Redis 实现延迟队列失效监听 redisson 实现发布订阅延迟二 redis 失效监听事件集成 KeyExpiratio 类实现 redis 失效监听事件三 此种实现面临的问题 redis 的失

前言:redis实现延迟队列该怎么做?在这里我分享一下

一、Redis实现延迟队列

  1. 失效监听
  2. redisson实现发布订阅延迟

二、redis失效监听事件

集成KeyExpirationEventMessageListener类实现redis失效监听事件

三、此种实现面临的问题

  1. redis的失效监听事件会存在一定的时间差,并且当数据量越大时,误差会越大。
  2. redis的失效监听事件会将所有key失效都会通知到onMessage,如果针对一个key,分布式业务的场景下,会出现重复消费的问题。(可以增加分布式锁的实现,但是redisson分布式锁提供了另一种延迟队列的实现方式)

四、开发准备

redis需要在服务端开启配置,打开redis服务的配置文件 添加notify-keyspace-events Ex

  • 相关参数如下:
K:keyspace事件,事件以__keyspace@ 
  
    >__为前缀进行发布; E:keyevent事件,事件以__keyevent@ 
   
     >__为前缀进行发布; g:一般性的,非特定类型的命令,比如del,expire,rename等; $:字符串特定命令; l:列表特定命令; s:集合特定命令; h:哈希特定命令; z:有序集合特定命令; x:过期事件,当某个键过期并删除时会产生该事件; e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件; A:g$lshzxe的别名,因此”AKE”意味着所有事件。 
    
  

五、基础实现

  1. 加入依赖
 
  
    > 
   
     >org.springframework.boot 
   > 
   
     >spring 
    -boot 
    -starter 
    -data 
    -redis 
   > 
  > 
  1. 可正常连接存取redis数据之后,创建监听类RedisKeyExpirationListener继承KeyExpirationEventMessageListener,重写onMessage方法。(key失效之后,会发出onMessage方法,之呢个获取失效的key值,不能获取key对应的value值)。
import com.test01.scrm.service.member.api.common.MemberStatusEnum; import com.test01.scrm.service.member.provider.service.base.IBaseMemberService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; / * @author lwl */ @Component @Slf4j public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { 
    private final IBaseMemberService baseMemberService; private final static String MEMBER_LOCK_ACCOUNT_SUFFIX = ".lock_account"; private final static String MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX = "T"; private final static String MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX = "M"; private final static String MEMBER_REDISSON_LOCK = ".member_lock_redisson"; private final static int WAIT_TIME = 5; private final static int LEASE_TIME = 10; public RedisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer, IBaseMemberService baseMemberService) { 
    super(redisMessageListenerContainer); this.baseMemberService = baseMemberService; } @Override public void onMessage(Message message, byte[] pattern) { 
    //获取失效的key String expiredKey = message.toString(); log.info("================================get on message:{}====================", expiredKey); if (expiredKey.endsWith(MEMBER_LOCK_ACCOUNT_SUFFIX)) { 
    log.info("================================on message:{}====================", expiredKey); try { 
    log.info("=======待解锁账号解锁======expiredKey:{}", expiredKey); String tenantId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_DOMAIN_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX)); String memberId = expiredKey.substring(expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_MEMBER_SUFFIX) + 1, expiredKey.indexOf(MEMBER_LOCK_ACCOUNT_SUFFIX)); baseMemberService.updateAccount(Integer.parseInt(tenantId), Long.parseLong(memberId), MemberStatusEnum.NORMAL.getCode(), null); } catch (Exception exception) { 
    log.info("auto unlock fail,expired key:{},exception:{}", expiredKey, exception.getMessage()); } } } } 
  1. 创建一个配置类RedisConfig
/ * @author lwl */ @Configuration public class RedisConfig { 
    @Value("${redis.dbIndex}") private Integer dbIndex; private final String TOPIC = "__keyevent@" + dbIndex + "__:expired"; private final RedisConnectionFactory redisConnectionFactory; public RedisConfig(RedisConnectionFactory redisConnectionFactory) { 
    this.redisConnectionFactory = redisConnectionFactory; } @Bean public RedisMessageListenerContainer redisMessageListenerContainer() { 
    RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); //keyevent事件,事件以__keyevent@ 
   
     __为前缀进行发布 
    //db为redis第几个库 db2... // redisMessageListenerContainer.addMessageListener(redisKeyExpirationListener, new PatternTopic(TOPIC)); return redisMessageListenerContainer; } } 

六、使用redisson实现延迟队列

由于延时队列持久化在redis中,所以机器宕机数据不会异常丢失,机器重启后,会正常消费队列中积累的任务

七、redisson实现延迟队列的原理

使用redis的zset有序性,轮询zset中的每个元素,到点后将内容迁移至待消费的队列

八、延迟队列配置

package com.test01.scrm.service.member.provider.config.redisson.delay; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; / * @author lwl * redisson延迟队列 */ @Configuration public class RedissonQueueConfig { 
    private final String queueName = "queue"; @Bean public RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonSingle") RedissonClient redissonClient) { 
    return redissonClient.getBlockingQueue(queueName); } @Bean(name = "rDelayedQueue") public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonSingle") RedissonClient redissonClient, @Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) { 
    return redissonClient.getDelayedQueue(blockQueue); } } 

定义队列使用接口

package com.test01.scrm.service.member.provider.config.redisson.delay; import java.util.concurrent.TimeUnit; / * @author lwl */ public interface DelayQueue { 
    / * 发布 * * @param object * @return */ Boolean offer(Object object); / * 带延迟功能的队列 * * @param object * @param time * @param timeUnit */ void offer(Object object, Long time, TimeUnit timeUnit); void offerAsync(Object object, Long time, TimeUnit timeUnit); Boolean offerAsync(Object object); } 

延迟队列实现

package com.test01.scrm.service.member.provider.config.redisson.delay; import org.redisson.api.RDelayedQueue; import org.redisson.api.RFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; / * @author lwl */ @Component public class RedissonDelayQueue implements DelayQueue { 
    private static Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class); @Resource(name = "rDelayedQueue") private RDelayedQueue<Object> rDelayedQueue; @Override public Boolean offer(Object object) { 
    return rDelayedQueue.offer(object); } @Override public void offer(Object object, Long time, TimeUnit timeUnit) { 
    rDelayedQueue.offer(object, time, timeUnit); } @Override public void offerAsync(Object object, Long time, TimeUnit timeUnit) { 
    rDelayedQueue.offerAsync(object, time, timeUnit); } @Override public Boolean offerAsync(Object object) { 
    boolean flag = false; RFuture<Boolean> rFuture = rDelayedQueue.offerAsync(object); try { 
    flag = rFuture.get(); } catch (InterruptedException | ExecutionException e) { 
    log.info("offerAsync exception:{}", e.getMessage()); e.printStackTrace(); } return flag; } } 

启动一个后台监控线程

package com.test01.scrm.service.member.provider.config.redisson.delay; import org.redisson.api.RBlockingQueue; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; / * @author test01 */ @Component public class RedissonTask { 
    @Resource(name = "rBlockingQueue") private RBlockingQueue<Object> rBlockingQueue; @PostConstruct public void take() { 
    new Thread(() -> { 
    while (true) { 
    try { 
    System.out.println("=========================" + rBlockingQueue.take()); } catch (InterruptedException e) { 
    e.printStackTrace(); } } }).start(); } } 

使用延迟队列发送

package com.test01.scrm.service.member.provider.impl; import org.junit.Test; import org.junit.runner.RunWith; import org.mybatis.spring.annotation.MapperScan; import org.redisson.api.RDelayedQueue; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @RunWith(SpringRunner.class) @SpringBootTest @ActiveProfiles(value = "llh") @MapperScan("com.test01.scrm.service.member.provider.mapper") public class RDelayQueueTests { 
    @Resource(name = "rDelayedQueue") private RDelayedQueue<Object> rDelayedQueue; @Test public void offerAsync() { 
    rDelayedQueue.offerAsync("llh send message", 20, TimeUnit.SECONDS); } } 

九、疑问解答与加群交流学习

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/207398.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午1:55
下一篇 2026年3月19日 下午1:55


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号