腾讯CMQ消息处理

腾讯CMQ消息处理CMQ 延迟消息处理方式针对延迟消息的时间满足不了需求的情况 把用户冻结的过期时间进行处理 外链图片转存失败 源站可能有防盗链机制 建议将图片保存下来直接上传 img fus0lwTV 89 file C Users HP Documents WXWork 75751 Cache Image 2021 05 da635f0d8e31 png 用户的冻结时间未到期把消息进行重复投递回队列中 一直未过期就一

CMQ延迟消息处理方式

针对延迟消息的时间满足不了需求的情况 , 把用户冻结的过期时间进行处理

用户的冻结时间未到期把消息进行重复投递回队列中 , 一直未过期就一直投递 , 直到用户冻结时间一过期把mysql库中的用户状态改为认证通过的状态

取消redis缓存作为判断冻结时间到期的依据, 自定义的消息体中增加到期时间字段 , 这样在消费的时候就知道其冻结的到期时间是多少, 如果当前消费的时间是在过期时间之前那么重复投递消息, 重复投递又有2个情况, 时间间隔超过1小时,按照1小时的过期时间投递, 如果不超过1小时的时间按照时间差进行投递, 如果是到了过期时间或者过期时间之后, 那么修改数据库的用户冻结状态, 发送解冻的短信给用户

package com.tencent.iov.userservice.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.stereotype.Component; import java.time.Duration; / * @author HP * @ Author; wangfei * @ Date ; 2021/5/8 17;00 * @ Version; 1.0 */ @Data @Component @ConfigurationProperties(prefix = "cmq") @RefreshScope public class CmqConfig { 
    / * 腾讯CMQ基本配置 */ private HttpConf httpConf; / * 用户冻结队列 */ private FrozenDelayQueueSetting frozenDelayQueueSetting; @Data public static class HttpConf { 
    private String secretId; private String secretKey; private String endpoint; private String path; private String method; } @Data public static class FrozenDelayQueueSetting { 
    private String queueName; private int consumerThreadCount; private Duration delayTime; } } 

基础功能类(基类)

package com.tencent.iov.userservice.cmq; import com.ruqimobility.common.dtos.BaseResponse; import com.ruqimobility.user.enummeta.UserResultEnum; import com.tencent.iov.parent.cmq.Account; import com.tencent.iov.parent.cmq.Queue; import com.tencent.iov.parent.cmq.Subscription; import com.tencent.iov.parent.cmq.Topic; import com.tencent.iov.userservice.config.CmqConfig; import com.tencent.iov.userservice.util.ResponseUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; / * @author HP * @ Author: wangfei * @ Date : 2021/5/8 16:56 * @ Version: 1.0 */ @Slf4j @DependsOn(value = { 
   "cmqConfig"}) @Service public class BaseMqService { 
    @Resource private CmqConfig cmqConfig; public BaseResponse<Boolean> delMsgPerReq(String queueRealName, String receiptHandle) { 
    Account account = createAccount(); Queue queue = account.getQueue(queueRealName); if (null == queue) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } try { 
    queue.deleteMessage(receiptHandle); return ResponseUtils.success(Boolean.TRUE); } catch (Exception e) { 
    e.printStackTrace(); log.error("del msg fail. queue:" + queueRealName + " | receiptHandle:" + receiptHandle + " | e:{}", e); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } } / * 批量删除消息 * * @param queueRealName * @param receiptHandleList * @return */ public BaseResponse<Boolean> batchDelMsgPerReq(String queueRealName, List<String> receiptHandleList) { 
    if (CollectionUtils.isEmpty(receiptHandleList)) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } Account account = createAccount(); Queue queue = account.getQueue(queueRealName); if (null == queue) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } try { 
    queue.batchDeleteMessage(receiptHandleList); return ResponseUtils.success(Boolean.TRUE); } catch (Exception e) { 
    e.printStackTrace(); log.error("del msg fail. queue:" + queueRealName + " | e:{}", e); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } } protected Account createAccount() { 
    return new Account(cmqConfig.getHttpConf().getSecretId(), cmqConfig.getHttpConf().getSecretKey(), cmqConfig.getHttpConf().getEndpoint(), cmqConfig.getHttpConf().getPath(), cmqConfig.getHttpConf().getMethod()); } public CmqConfig.FrozenDelayQueueSetting getFrozenDelayQueueSetting() { 
    return cmqConfig.getFrozenDelayQueueSetting(); } public void createSubscribe(String topicName, String subscriptionName, String Endpoint, String Protocal) throws Exception { 
    createAccount().createSubscribe(topicName, subscriptionName, Endpoint, Protocal); } public Queue getQueue(String queue) { 
    return createAccount().getQueue(queue); } public Topic getTopic(String topicName) { 
    return createAccount().getTopic(topicName); } public Subscription getSubscription(String topicName, String subscriptionName) { 
    return createAccount().getSubscription(topicName, subscriptionName); } } 

cmq消费类

package com.tencent.iov.userservice.cmq; import com.alibaba.fastjson.JSON; import com.ruqimobility.common.dtos.BaseResponse; import com.ruqimobility.user.enummeta.UserResultEnum; import com.tencent.iov.parent.cmq.Account; import com.tencent.iov.parent.cmq.CMQServerException; import com.tencent.iov.parent.cmq.Message; import com.tencent.iov.parent.cmq.Queue; import com.tencent.iov.userservice.util.ResponseUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.Collections; import java.util.List; import java.util.Optional; / * @ Author: wangfei * @ Date : 2021/5/10 16:42 * @ Version: 1.0 */ @Slf4j @Service public class CmqConsumerService extends BaseMqService { 
    public Optional<Message> consumerMsg(String queueName) { 
    Queue queue = super.getQueue(queueName); if (null == queue) { 
    return Optional.empty(); } Message message = null; try { 
    message = queue.receiveMessage(); return Optional.ofNullable(message); } catch (CMQServerException e) { 
    return Optional.empty(); } catch (Exception e1) { 
    log.error("consume cmq msg fail. queue:" + queueName + " | message:" + JSON.toJSONString(message) + " | e:{}", e1); return Optional.empty(); } } / * 每次请求都new一个新的连接 * * @param queueRealName //真实的队列名称,不是spring bean 的名称 * @return */ public BaseResponse<Message> consumerMsgPerReq(String queueRealName) { 
    Account account = super.createAccount(); Queue queue = account.getQueue(queueRealName); if (null == queue) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } Message message = null; try { 
    message = queue.receiveMessage(); return ResponseUtils.success(message); } catch (CMQServerException e1) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } catch (Exception e) { 
    e.printStackTrace(); log.error("consume msg fail. queue:" + queueRealName + " | message:" + JSON.toJSONString(message) + " | e:{}", e); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } } / * 批量拉取消息,每批次最多16条,SDK规定 * * @param queueRealName * @param numOfBatchMsgPerPolling * @return */ public BaseResponse<List<Message>> batchConsumerMsgPerReq(String queueRealName, int numOfBatchMsgPerPolling) { 
    if (numOfBatchMsgPerPolling > 16) { 
    log.info("error"); } Account account = super.createAccount(); Queue queue = account.getQueue(queueRealName); if (null == queue) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } List<Message> messageList = Collections.emptyList(); try { 
    messageList = queue.batchReceiveMessage(numOfBatchMsgPerPolling); } catch (CMQServerException e1) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } catch (Exception e) { 
    log.error("consume msg fail. queue:" + queueRealName + " | message:" + JSON.toJSONString(messageList) + " | e:{}", e); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } return ResponseUtils.success(messageList); } } 

cmq生产者类

package com.tencent.iov.userservice.cmq; import com.ruqimobility.common.dtos.BaseResponse; import com.ruqimobility.user.enummeta.UserResultEnum; import com.tencent.iov.parent.cmq.Account; import com.tencent.iov.parent.cmq.CMQServerException; import com.tencent.iov.parent.cmq.Queue; import com.tencent.iov.userservice.util.ResponseUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.util.List; / * @author HP * @ Author: wangfei * @ Date : 2021/5/10 16:45 * @ Version: 1.0 */ @Slf4j @Service public class CmqProducerService extends BaseMqService { 
    public BaseResponse<Boolean> sendToQueue(String queueName, String msgBody, int delay) { 
    try { 
    Queue queue = super.getQueue(queueName); if (null == queue) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } if (delay > 0) { 
    queue.sendMessage(msgBody, delay); } else { 
    queue.sendMessage(msgBody); } return ResponseUtils.success(Boolean.TRUE); } catch (CMQServerException e1) { 
    log.error("CMQServerException. error:{}", e1); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } catch (Exception e) { 
    e.printStackTrace(); log.error("send msg fail. queue:" + queueName + " | msgBody:+" + msgBody + " | e:{}", e); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } } / * 每次发送消息都创建一次连接 * * @param queueRealName * @param msgBody * @param delay * @return */ public BaseResponse<Boolean> sendToQueuePerReq(String queueRealName, String msgBody, int delay) { 
    try { 
    Account account = super.createAccount(); Queue queue = account.getQueue(queueRealName); if (null == queue) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } if (delay > 0) { 
    queue.sendMessage(msgBody, delay); } else { 
    queue.sendMessage(msgBody); } return ResponseUtils.success(Boolean.TRUE); } catch (CMQServerException e1) { 
    log.error("CMQServerException. error:{}", e1); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } catch (Exception e) { 
    e.printStackTrace(); log.error("send msg fail. queue:" + queueRealName + " | msgBody:+" + msgBody + " | e:{}", e); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } } / * 批量发送消息 * * @param queueRealName * @param msgBody * @param delay * @return */ public BaseResponse<Boolean> batchSendToQueuePerReq(String queueRealName, List<String> msgBody, int delay) { 
    try { 
    Account account = super.createAccount(); Queue queue = account.getQueue(queueRealName); if (null == queue) { 
    return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } if (delay > 0) { 
    queue.batchSendMessage(msgBody, delay); } else { 
    queue.batchSendMessage(msgBody); } return ResponseUtils.success(Boolean.TRUE); } catch (CMQServerException e1) { 
    log.error("CMQServerException. error:{}", e1); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } catch (Exception e) { 
    e.printStackTrace(); log.error("send msg fail. queue:" + queueRealName + " | msgBody:+" + msgBody + " | e:{}", e); return ResponseUtils.fail(UserResultEnum.SERVICE_INNER_ERROR); } } } 

冻结消费service

package com.tencent.iov.userservice.cmq; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruqimobility.common.dtos.BaseResponse; import com.ruqimobility.user.enummeta.SrAuthEnum; import com.ruqimobility.user.enummeta.UserRoleEnum; import com.tencent.iov.parent.cmq.Message; import com.tencent.iov.userservice.config.CmqConfig; import com.tencent.iov.userservice.dao.UserMapper; import com.tencent.iov.userservice.dto.message.UserRoleFrozenMsg; import com.tencent.iov.userservice.model.User; import com.tencent.iov.userservice.service.UserRoleFrozenService; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.io.IOException; import java.sql.Timestamp; import java.util.Optional; import java.util.concurrent.Executor; / * @author HP * @ Author: wangfei * @ Date : 2021/5/10 17:47 * @ Version: 1.0 */ @Slf4j @Component public class FrozenConsumerService extends CmqConsumerService implements ApplicationRunner { 
    private static boolean CONSUMER_RUNNING = true; @Resource(name = "taskAsyncPool") private Executor taskExecutor; @Resource private UserRoleFrozenService userRoleFrozenService; @Resource private FrozenCmqProducerService frozenCmqProducerService; @Resource private UserMapper userMapper; public void startUp() { 
    CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting(); // 队列名称 String queueName = setting.getQueueName(); // 消费线程数 int queueNum = setting.getConsumerThreadCount(); for (int i = 0; i < queueNum; i++) { 
    // 创建任务 UnFrozenUserTask unFrozenUserTask = new UnFrozenUserTask(queueName, this, userRoleFrozenService); // executor执行 taskExecutor.execute(unFrozenUserTask); } //注册钩子 Runtime.getRuntime().addShutdownHook(new Thread(() -> { 
    log.info("Order DispatchOrder The hook running..."); //第三步:调用停机处理 stop(); })); } @Override public void run(ApplicationArguments args) throws Exception { 
    log.info("========= consumer frozen queue message start ========= "); startUp(); } @PreDestroy public void stop() { 
    log.info("stop UnFrozenUser task consumer"); CONSUMER_RUNNING = false; } public class UnFrozenUserTask implements Runnable { 
    private String queueName; private FrozenConsumerService consumerService; private UserRoleFrozenService userRoleFrozenService; public UnFrozenUserTask(String queueName, FrozenConsumerService consumerService, UserRoleFrozenService userRoleFrozenService) { 
    this.queueName = queueName; this.consumerService = consumerService; this.userRoleFrozenService = userRoleFrozenService; } / * 执行任务 */ @Override public void run() { 
    // 消费解冻逻辑 while (CONSUMER_RUNNING) { 
    // 消费消息 Optional<Message> response = consumerService.consumerMsg(queueName); if (!response.isPresent()) { 
    try { 
    Thread.sleep(2000); } catch (InterruptedException e) { 
    log.error("consumer order status cmq thread sleep ex: {}", e); } // 消息为空continue 重复消费 continue; } //获取消息体 String msgBody = response.get().msgBody; log.info("get the userRoleFrozenMsg message body from cmq: {}", msgBody); try { 
    UserRoleFrozenMsg userRoleFrozenMsg = new ObjectMapper().readValue(msgBody, UserRoleFrozenMsg.class); User user = userMapper.getById(userRoleFrozenMsg.getUserId()); // 处于冻结状态时,判断是否冻结时间是否已过 if (user.getSrAuth() == SrAuthEnum.FROZEN.code) { 
    Timestamp now = new Timestamp(System.currentTimeMillis()); Timestamp endTime = userRoleFrozenMsg.getFrozenEndTime(); // 在结束时间之前继续投递 if (now.before(endTime)) { 
    long interval = (endTime.getTime() - now.getTime()) / 1000; // 间隔超过1小时时 , 按1小时投递延迟消息 if (interval > FrozenCmqProducerService.MAX_DELAY_SECOND) { 
    BaseResponse<Boolean> resp = frozenCmqProducerService.sendMessage(JSON.toJSONString(userRoleFrozenMsg)); log.info("Deliver messages more than 1 hour interval"); } else { 
    // 1小时内 , 按照时间差投递 BaseResponse<Boolean> baseResponse = frozenCmqProducerService.sendMessage(JSON.toJSONString(userRoleFrozenMsg), (int) interval); log.info("Deliver duplicate delivery messages within 1 hour"); } } else if (now.after(endTime)) { 
    // 在结束时间之后, 修改用户实名认证状态 userRoleFrozenService.autoUnFrozenUserRole(user.getUserId(), UserRoleEnum.SFC_USER); } } // 删除此次消费的消息 BaseResponse<Boolean> delMsgPerReq = consumerService.delMsgPerReq(queueName, response.get().receiptHandle); log.info("sr_auth equals 1 message delete userRoleFrozenMsg from cmq msg: {} response: {}", msgBody, delMsgPerReq); } catch (IOException e) { 
    log.error("userRoleFrozenMsg message:{} body parsing error:{}", msgBody, e); } } } } } 

冻结生产service

package com.tencent.iov.userservice.cmq; import com.ruqimobility.common.dtos.BaseResponse; import com.tencent.iov.userservice.config.CmqConfig; import com.tencent.iov.userservice.util.ResponseUtils; import org.springframework.stereotype.Service; import java.util.List; / * @author HP * @ Author: wangfei * @ Date : 2021/5/27 17:42 * @ Version: 1.0 */ @Service public class FrozenCmqProducerService extends CmqProducerService { 
    / * 最大延迟时间, 3600 秒 */ public static Integer MAX_DELAY_SECOND = 3600; / * 发送延迟消息 * * @param msgBody * @return */ public BaseResponse<Boolean> sendMessage(String msgBody) { 
    CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting(); Long seconds = setting.getDelayTime().getSeconds(); return super.sendToQueue(setting.getQueueName(), msgBody, seconds.intValue()); } / * 批量发送延迟冻结消息 * * @return */ public BaseResponse<Boolean> batchSendMessage(List<String> msgBodyList) { 
    CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting(); Long seconds = setting.getDelayTime().getSeconds(); return super.batchSendToQueuePerReq(setting.getQueueName(), msgBodyList, seconds.intValue()); } / * 发送自定义延迟时间消息 * * @param msgBody * @return */ public BaseResponse<Boolean> sendMessage(String msgBody, int delay) { 
    CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting(); if (delay > MAX_DELAY_SECOND) { 
    return ResponseUtils.success(Boolean.FALSE); } return super.sendToQueue(setting.getQueueName(), msgBody, delay); } / * 批量发送延迟冻结消息 * * @return */ public BaseResponse<Boolean> batchSendMessage(List<String> msgBodyList, int delay) { 
    CmqConfig.FrozenDelayQueueSetting setting = super.getFrozenDelayQueueSetting(); if (delay > MAX_DELAY_SECOND) { 
    return ResponseUtils.success(Boolean.FALSE); } return super.batchSendToQueuePerReq(setting.getQueueName(), msgBodyList, delay); } } 
// 删除此次消费的消息 BaseResponse<Boolean> delMsgPerReq = consumerService.delMsgPerReq(queueName, response.get().receiptHandle); log.info("sr_auth equals 1 message delete userRoleFrozenMsg from cmq msg: {} response: {}", msgBody, delMsgPerReq); 

cmq的延迟消息只能做到最大1小时的延迟

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/199643.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月20日 下午12:32
下一篇 2026年3月20日 下午12:32


相关推荐

  • 被网页挂马攻击的几个要素_网站挂马检测工具箱书籍

    被网页挂马攻击的几个要素_网站挂马检测工具箱书籍网马网马的本质是一个特定的网页,这个网页包含了攻击者精心构造的恶意代码,这些恶意代码通过利用浏览器(包括控件、插件)的漏洞,加载并执行攻击者指定的恶意软件(通常是木马)。网站挂马是黑客植入木马的一种主要手段。黑客通过入侵或者其他方式控制了网站的权限,在网站的Web页面中插入网马,用户在访问被挂马的网站时也会访问黑客构造的网马,网马在被用户浏览器访问时就会利用浏览器或者相关插件的漏洞,下载并执行…

    2026年4月19日
    6
  • Java集合类详解

    Java集合类详解Collection├List│├LinkedList│├ArrayList│└Vector│ └Stack└SetMap├Hashtable├HashMap└WeakHashMapCollection接口  Collection是最基本的集合接口,一个Collection代表一组Object,即Collection的元素(Element

    2022年4月27日
    44
  • keil5使用技巧

    keil5使用技巧文章目录前言1、关闭其他文件2、keil注释如何不乱码4、每段程序后都要空行5、添加头文件6、开启和关闭工程列表框7、找到库函数总结前言1、关闭其他文件2、keil注释如何不乱码4、每段程序后都要空行5、添加头文件6、开启和关闭工程列表框7、找到库函数方法就是打开一个.h文件拖到最后→看到如下字样的,就是库函数了/**@defgroupGPIO_Exported_Functions@{*/例如:找EXTI的库函数打开exti.h文件,拖到最后,这些就是EXTI

    2022年5月23日
    64
  • jquery弹窗插件dialog_jquery进度条插件

    jquery弹窗插件dialog_jquery进度条插件143行js顶部进度条最小插件-nanobar.js源码解析

    2022年4月20日
    70
  • 快速手工实现软件著作权源码60页制作

    快速手工实现软件著作权源码60页制作软件著作权源码基本要求:(1)前30页,后30页,一共60页。其实前30页还好说,什么叫后30页?实际代码没有绝对的后;(2)每页至少60行,无空行,有一定的注释。快速插入代码先按顺序准备好代码,”插入”-“对象”-“文件中文字”,可快速插入所有代码。每页至少60行首先得知道每页多少行,可通过”页面设置”-“版式”-“行号”,显示出行号:效果如下:设…

    2022年4月19日
    192
  • 软件著作权源代码要求_安卓著作权源码格式要求

    软件著作权源代码要求_安卓著作权源码格式要求摘要:对软件著作权申请都要提供软件60页的源代码,超出60页的应至少提交最前和最后的各连续30页源程序文本,不足60页的,应当将所有的源程序文本全部提交,程序要有比较鲜明的开始段落1.代码语法要求对软件著作权提交源代码是提供原始的代码(不是关键代码)语法上要求代码具备完整性。而且要求提交者提供的源代码是对应的代码文件的最原始文本信息。例如:C++代码应该是以include之类作为开头,而不能以函数…

    2026年2月18日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号