go实现的redis消息队列

go实现的redis消息队列go 实现的消息队列 redis queue 支持并发队列支持 Topic Group 注册队列监听消息支持消息异常监听重启队列贴相关核心代码 最后附上 git 链接 消息载体 typeQueuePay IDstring json id IsFastbool json is fast Topicstring json topic Groupstrin

go实现的消息队列 redis_queue

  • 支持并发队列
  • 支持Topic,Group注册队列监听消息
  • 支持消息异常监听重启队列

贴相关核心代码,最后附上git链接

//消息载体 type QueuePayload struct { ID string `json:"id"` IsFast bool `json:"is_fast"` Topic string `json:"topic"` Group string `json:"group"` Body interface{} `json:"body"` } var instanceQueueManager *QueueManager var onceQueueManager sync.Once //队列管理器 type QueueManager struct { db *redis.Client MaxRetry int //重试的最大次数 RecoverCh chan RecoverData //队列恢复监听通道 Handlers map[string]interface{} //注册队列Map } //初始化队列管理器 func NewQueueManager() *QueueManager { onceQueueManager.Do(func() { instanceQueueManager = &QueueManager{} instanceQueueManager.MaxRetry = 3 instanceQueueManager.Handlers = make(map[string]interface{}) }) return instanceQueueManager } func (r *QueueManager) GetQueueName(topic string, group string) string { var name string if len(group) > 0 { name = fmt.Sprintf("Queue_%s::%s", topic, group) } else { name = fmt.Sprintf("Queue_%s", topic) } return name } //注册队列 func (r *QueueManager) RegisterQueue(topic string, group string, handler interface{}) error{ name := r.GetQueueName(topic, group) if _, ok := r.Handlers[name]; ok { return errors.New("is exits") }else{ r.Handlers[name] = handler go r.QueueConsume(topic, group) } return nil } //生产者执行入队列 func (r *QueueManager) QueuePublish(payload *QueuePayload) error { if len(payload.Topic) <= 0 { return errors.New("TopicId can not be empty") } id, err := uuid.NewUUID() if err != nil { return err } payload.ID = id.String() payloadStr, _ := json.Marshal(payload) r.db.LPush(r.GetQueueName(payload.Topic, payload.Group), payloadStr) return nil } //消费者执行出队列 func (r *QueueManager) QueueConsume(topic string, group string) { defer func() { if err := recover(); err != nil { var stacktrace string for i := 1; ; i++ { _, f, l, got := runtime.Caller(i) if !got { break } stacktrace += fmt.Sprintf("%s:%d\n", f, l) } // when stack finishes logMessage := fmt.Sprintf("Trace: %s\n", err) logMessage += fmt.Sprintf("\n%s", stacktrace) log.Println(logMessage) //执行恢复函数 r.handleRecover(topic, group) } }() for { //消费者执行出列 var payload QueuePayload result := r.db.BRPop(0, r.GetQueueName(topic, group)) if (len(result.Val()) > 0) { vals := result.Val()[1] err := json.Unmarshal([]byte(vals), &payload) if err != nil { log.Println("BRPOP json.Unmarshal Error:", err) continue } //执行回调函数 r.handleCallBack(&payload) } } } //执行恢复函数 func (r *QueueManager) handleRecover(topic string, group string) { handleName := r.GetQueueName(topic, group) handler, ok := r.Handlers[handleName] if r.RecoverCh != nil && ok{ r.RecoverCh <- RecoverData{topic, group, handler} } } //执行回调函数 func (r *QueueManager) handleCallBack (payload *QueuePayload){ handleName := r.GetQueueName(payload.Topic, payload.Group) it := r.Handlers[handleName] if it != nil { if ob, ok := it.(Queueable); ok { //同步执行Max次,保证队列顺序,失败则丢弃消息, for i:=0; i< r.MaxRetry; i++ { rs := ob.Execute(payload) if rs.State{ break } } }else{ log.Println("no ExecuteFunc,pop:", payload) } } } 

上述完整代码链接: git链接

参考的代码:https://github.com/bennya8/go_redis_queue_manager

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/230497.html原文链接:https://javaforall.net

(0)
上一篇 2026年2月19日 下午12:01
下一篇 2026年2月19日 下午12:22


相关推荐

  • 最牛营业部——国信泰然九路揭秘

    最牛营业部——国信泰然九路揭秘到深圳 有条路贯穿深圳市区东西城区 世界之窗 欢乐谷 锦绣中华等景点坐落于其两旁 这条路即是被誉为深圳 名片 的深南大道 然而 在这条长达 23 公里的大道上 远比风景更有趣的景象 是道路两边不时出现的证券营业部的招牌 据记者粗略统计 大如国信证券 海通证券 10 46 0 14 1 36 小如西部证券 中山证券 汉唐证券 巨田证券等等 在这里共开设了逾百家营业部 按照深圳当地一位券商人士的话说

    2026年3月26日
    1
  • gatk过滤_快速入门GATK | Public Library of Bioinformatics

    gatk过滤_快速入门GATK | Public Library of BioinformaticsGATK 全称是 GenomeAnlysi 顾名思义 是一套用于分析基因组的工具箱 主要功能是寻找变异位点和基因分型 但是实际上功能超多 导致初学者都不知道从何学习 GATK 最近因为 mapping by sequencing 要寻找 variant 所以接触了 GATK 我相信很多人第一眼看到 GATK 是茫然的 因为它的功能实在是太多了 都不知道从何开始 这里就说下我是如何在一脸茫然的情

    2026年3月18日
    3
  • 精美的液晶数字字体素材[通俗易懂]

    精美的液晶数字字体素材[通俗易懂]液晶数字应该比较常见,那么液晶数字字体的应用也是相对广泛了,可以运用于一切需要液晶显示屏上的数字字体显示。对于这样一种有着广泛的应用数字字体,选择使用哪款液晶数字字体也是一个很重要的问题啦!为此,特意为大家收集了几款液晶数字字体供大家选择,喜欢的朋友赶紧收藏起来吧!  DS-Digital字体是一款比较常规的液晶数字字体,这款字体的仅支持数字和大写字母输入,字体端正,结构完整,整体视觉呈现效果…

    2025年7月27日
    8
  • ASMM与AMM「建议收藏」

    ASMM与AMM「建议收藏」ASMM(AutomaticSharedMemoryManagement,自动共享内存管理)是Oracle10g引入的概念。通过使用ASMM,就不需要手工设置相关内存组件的大小,而只为SGA设置一个总的大小,Oracle的MMAN进程(MemoryManagerProcess,内存管理进程)会随着时间推移,根据系统负载的变化和内存需要,自动调整SGA中各个组件的内存大小。ASMM的…

    2022年5月2日
    87
  • OpenClaw 小龙虾部署难?数商云给你企业级一站式方案

    OpenClaw 小龙虾部署难?数商云给你企业级一站式方案

    2026年3月12日
    3
  • MySQL数据库:表结构优化

    MySQL数据库:表结构优化

    2021年4月9日
    143

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号