SpringBoot+Redis+最简发布订阅

SpringBoot+Redis+最简发布订阅依赖 dependency groupId org springframew boot groupId artifactId spring boot starter data redis artifactId version spring boot ve version dependency

依赖

 
   
   
     org.springframework.boot 
    
   
     spring-boot-starter-data-redis 
    
   
     ${spring-boot.version} 
    
  

redis配置

这里提供必备的配置(基本配置忽略)这里没配好的话会出现JSON解析异常乱码现象

/ * 序列化定制 * * @return */ @Bean public Jackson2JsonRedisSerializer  jackson2JsonSerializer() { Jackson2JsonRedisSerializer  jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>( Object.class); // 初始化objectmapper ObjectMapper mapper = new ObjectMapper(); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); return jackson2JsonRedisSerializer; } / * 操作模板 * * @param connectionFactory * @param jackson2JsonRedisSerializer * @return */ @Bean @Primary public RedisTemplate 
    
      redisTemplate(JedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer 
      jackson2JsonRedisSerializer) { RedisTemplate 
      
        template = new RedisTemplate 
       
         (); template.setConnectionFactory(connectionFactory); // 设置key/hashkey序列化 RedisSerializer 
        
          stringSerializer = new StringRedisSerializer(); template.setKeySerializer(stringSerializer); template.setHashKeySerializer(stringSerializer); // 设置值序列化 template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } 
         
        
        
      

绑定消息处理器:(这里示例是三个一对一的发布订阅,也可以一对多)

import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.stereotype.Component; / * 绑定消息处理器 * * @author sunziwen * @version 1.0 * @date 2020/1/8 17:10 / @Component public class BindingContainer { @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer  jackson2JsonRedisSerializer, HandleReceiver handleReceiver) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); / * 绑定异常日志处理信道 */ MessageListenerAdapter exceptionAdapter = new MessageListenerAdapter(handleReceiver, "handleExceptionInfo"); exceptionAdapter.setSerializer(jackson2JsonRedisSerializer); exceptionAdapter.afterPropertiesSet(); container.addMessageListener(exceptionAdapter, new PatternTopic("channel:exception_info")); / * 绑定登录日志处理信道 */ MessageListenerAdapter loginAdapter = new MessageListenerAdapter(handleReceiver, "handleLoginInfo"); loginAdapter.setSerializer(jackson2JsonRedisSerializer); loginAdapter.afterPropertiesSet(); container.addMessageListener(loginAdapter, new PatternTopic("channel:login_info")); / * 绑定访问日志处理信道 */ MessageListenerAdapter visitorAdapter = new MessageListenerAdapter(handleReceiver, "handleVisitor"); visitorAdapter.setSerializer(jackson2JsonRedisSerializer); visitorAdapter.afterPropertiesSet(); container.addMessageListener(visitorAdapter, new PatternTopic("channel:visitor_info")); return container; } } 

消息处理器

package com.rz.mq; import com.rz.domain.SysExceptionInfo; import com.rz.domain.SysLoginInfo; import com.rz.domain.SysVisitorInfo; import com.rz.service.SysExceptionInfoServer; import com.rz.service.SysLoginInfoServer; import com.rz.service.SysVisitorInfoServer; import lombok.AllArgsConstructor; import org.springframework.stereotype.Component; / * 消息处理器 * * @author sunziwen * @version 1.0 * @date 2020/1/8 17:08 / @Component @AllArgsConstructor public class HandleReceiver { private SysLoginInfoServer sysLoginInfoServer; private SysExceptionInfoServer sysExceptionInfoServer; private SysVisitorInfoServer sysVisitorInfoServer; / * 记录访问日志 * * @param sysVisitorInfo * @param message */ public void handleVisitor(SysVisitorInfo sysVisitorInfo, String message) { sysVisitorInfoServer.save(sysVisitorInfo); } / * 记录异常日志 * * @param sysExceptionInfo * @param message */ public void handleExceptionInfo(SysExceptionInfo sysExceptionInfo, String message) { sysExceptionInfoServer.save(sysExceptionInfo); } / * 记录登录日志 * * @param sysLoginInfo * @param message */ public void handleLoginInfo(SysLoginInfo sysLoginInfo, String message) { sysLoginInfoServer.save(sysLoginInfo); } }

数据模型类:忽略…

消息发布:这里给出其中一个消息发布示例

import java.util.Date; import java.util.Map; import com.alibaba.fastjson.JSON; import com.rz.constants.ServiceNames; import com.rz.domain.SysVisitorInfo; import com.rz.dto.AppResDto; import com.rz.enums.ResEnums; import com.rz.service.SysVisitorInfoServer; import com.rz.util.application.ApplicationUtils; import com.rz.utils.IdWorker; import com.rz.utils.RedisUtil; import lombok.AllArgsConstructor; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.context.annotation.Configuration; import lombok.extern.slf4j.Slf4j; / * 接口访问日志 * * @author sunziwen * @version 1.0 * @date 2019/12/13 8:34 / @Aspect @Configuration @Slf4j @AllArgsConstructor public class VisitorLog { private RedisUtil redisUtil; @Around("execution(public * *(..))&&@annotation(com.rz.exceptions.RzLog)") public Object interceptor02(ProceedingJoinPoint pjp) throws Throwable { SysVisitorInfo visitorInfo = SysVisitorInfo.builder() .id(IdWorker.getId()) .userId(ApplicationUtils.currentUid()) .createTime(new Date()) .uri(ApplicationUtils.getRequest().getRequestURI()) .params(JSON.toJSONString(ApplicationUtils.getRequest().getParameterMap())) .serverName(ServiceNames.rz_api) .build(); try { Object proceed = pjp.proceed(); AppResDto resDto = (AppResDto) proceed; visitorInfo.setResult(resDto.getMsg()); redisUtil.convertAndSend("channel:visitor_info",visitorInfo); return proceed; } catch (BaseException e) { ResEnums resEnums = e.getResEnums(); visitorInfo.setResult(resEnums.getMsg()); redisUtil.convertAndSend("channel:visitor_info",visitorInfo); throw e; } } } 

注意:消息发布和消息订阅不要放在同一个服务中,消息订阅应该单独开一个服务来接收处理,否则集群的时候会有重复消费的情况。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/206231.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午4:09
下一篇 2026年3月19日 下午4:10


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号