依赖
org.springframework.boot
spring-boot-starter-data-redis
${spring-boot.version}
redis配置
这里提供必备的配置(基本配置忽略)这里没配好的话会出现JSON解析异常乱码现象
/ * 序列化定制 * * @return */ @Bean public Jackson2JsonRedisSerializer
绑定消息处理器:(这里示例是三个一对一的发布订阅,也可以一对多)
import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.stereotype.Component; / * 绑定消息处理器 * * @author sunziwen * @version 1.0 * @date 2020/1/8 17:10 / @Component public class BindingContainer { @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer, HandleReceiver handleReceiver) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); / * 绑定异常日志处理信道 */ MessageListenerAdapter exceptionAdapter = new MessageListenerAdapter(handleReceiver, "handleExceptionInfo"); exceptionAdapter.setSerializer(jackson2JsonRedisSerializer); exceptionAdapter.afterPropertiesSet(); container.addMessageListener(exceptionAdapter, new PatternTopic("channel:exception_info")); / * 绑定登录日志处理信道 */ MessageListenerAdapter loginAdapter = new MessageListenerAdapter(handleReceiver, "handleLoginInfo"); loginAdapter.setSerializer(jackson2JsonRedisSerializer); loginAdapter.afterPropertiesSet(); container.addMessageListener(loginAdapter, new PatternTopic("channel:login_info")); / * 绑定访问日志处理信道 */ MessageListenerAdapter visitorAdapter = new MessageListenerAdapter(handleReceiver, "handleVisitor"); visitorAdapter.setSerializer(jackson2JsonRedisSerializer); visitorAdapter.afterPropertiesSet(); container.addMessageListener(visitorAdapter, new PatternTopic("channel:visitor_info")); return container; } }
消息处理器
package com.rz.mq; import com.rz.domain.SysExceptionInfo; import com.rz.domain.SysLoginInfo; import com.rz.domain.SysVisitorInfo; import com.rz.service.SysExceptionInfoServer; import com.rz.service.SysLoginInfoServer; import com.rz.service.SysVisitorInfoServer; import lombok.AllArgsConstructor; import org.springframework.stereotype.Component; / * 消息处理器 * * @author sunziwen * @version 1.0 * @date 2020/1/8 17:08 / @Component @AllArgsConstructor public class HandleReceiver { private SysLoginInfoServer sysLoginInfoServer; private SysExceptionInfoServer sysExceptionInfoServer; private SysVisitorInfoServer sysVisitorInfoServer; / * 记录访问日志 * * @param sysVisitorInfo * @param message */ public void handleVisitor(SysVisitorInfo sysVisitorInfo, String message) { sysVisitorInfoServer.save(sysVisitorInfo); } / * 记录异常日志 * * @param sysExceptionInfo * @param message */ public void handleExceptionInfo(SysExceptionInfo sysExceptionInfo, String message) { sysExceptionInfoServer.save(sysExceptionInfo); } / * 记录登录日志 * * @param sysLoginInfo * @param message */ public void handleLoginInfo(SysLoginInfo sysLoginInfo, String message) { sysLoginInfoServer.save(sysLoginInfo); } }
数据模型类:忽略…
消息发布:这里给出其中一个消息发布示例
import java.util.Date; import java.util.Map; import com.alibaba.fastjson.JSON; import com.rz.constants.ServiceNames; import com.rz.domain.SysVisitorInfo; import com.rz.dto.AppResDto; import com.rz.enums.ResEnums; import com.rz.service.SysVisitorInfoServer; import com.rz.util.application.ApplicationUtils; import com.rz.utils.IdWorker; import com.rz.utils.RedisUtil; import lombok.AllArgsConstructor; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.context.annotation.Configuration; import lombok.extern.slf4j.Slf4j; / * 接口访问日志 * * @author sunziwen * @version 1.0 * @date 2019/12/13 8:34 / @Aspect @Configuration @Slf4j @AllArgsConstructor public class VisitorLog { private RedisUtil redisUtil; @Around("execution(public * *(..))&&@annotation(com.rz.exceptions.RzLog)") public Object interceptor02(ProceedingJoinPoint pjp) throws Throwable { SysVisitorInfo visitorInfo = SysVisitorInfo.builder() .id(IdWorker.getId()) .userId(ApplicationUtils.currentUid()) .createTime(new Date()) .uri(ApplicationUtils.getRequest().getRequestURI()) .params(JSON.toJSONString(ApplicationUtils.getRequest().getParameterMap())) .serverName(ServiceNames.rz_api) .build(); try { Object proceed = pjp.proceed(); AppResDto resDto = (AppResDto) proceed; visitorInfo.setResult(resDto.getMsg()); redisUtil.convertAndSend("channel:visitor_info",visitorInfo); return proceed; } catch (BaseException e) { ResEnums resEnums = e.getResEnums(); visitorInfo.setResult(resEnums.getMsg()); redisUtil.convertAndSend("channel:visitor_info",visitorInfo); throw e; } } }
注意:消息发布和消息订阅不要放在同一个服务中,消息订阅应该单独开一个服务来接收处理,否则集群的时候会有重复消费的情况。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/206231.html原文链接:https://javaforall.net
