使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo

使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demozmq 的 DEALER 套接字是对 REQ 的一层包装 本质就是在发送数据前发送一段 bytes 数据 所以一个 bytes 数据就意味着一个连接 而 ROUTER 就是对 REP 的包装 是在接收数据前先接收 bytes 数据 REQ REP 可以看作 bytes 数据为空的 DEALER ROUTER 模式 但是 DEALER 和 ROUTER 又不像 REQ 和 REP 一样严格遵循 send recv send recv 的模式 总体分为 client broker worker 三个部分 简化如下图所示 broker 的伪码流程

zmq的DEALER套接字是对REQ的一层包装,本质就是在发送数据前发送一段bytes数据(所以一个bytes数据就意味着一个连接),而ROUTER就是对REP的包装,是在接收数据前先接收bytes数据。REQ-REP可以看作bytes数据为空的DEALER-ROUTER模式,但是DEALER和ROUTER又不像REQ和REP一样严格遵循send-recv-send-recv…..的模式。

总体分为client-broker-worker三个部分,简化如下图所示。

使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo

 

 broker的伪码流程图:

使用python基于zmq的DEALER-ROUTER模式实现分布式消息分发的demo

附代码:

client.py

import zmq ctx = zmq.Context.instance() socket = ctx.socket(zmq.DEALER) socket.connect("tcp://localhost:12000") if __name__ == '__main__': socket.send(b"hello") msg = socket.recv() print(msg) 

broker.py

import zmq import time from collections import OrderedDict context = zmq.Context.instance() frontend = context.socket(zmq.ROUTER) frontend.bind("tcp://*:12000") backend = context.socket(zmq.ROUTER) backend.bind("tcp://*:13000") frontend.setsockopt(zmq.RCVHWM, 100) backend.setsockopt(zmq.RCVHWM, 100) workers = OrderedDict() clients = {} msg_cache = [] poll = zmq.Poller() poll.register(backend, zmq.POLLIN) poll.register(frontend, zmq.POLLIN) if __name__ == '__main__': while True: socks = dict(poll.poll(10)) now = time.time() # 接收后端消息 if backend in socks and socks[backend] == zmq.POLLIN: # 接收后端地址、客户端地址、后端返回response ps: 此处的worker_addr, client_addr, reply均是bytes类型 worker_addr, client_addr, response = backend.recv_multipart() # 把后端存入workers workers[worker_addr] = time.time() if client_addr in clients: # 如果客户端地址存在,把返回的response转发给客户端,并删除客户端 frontend.send_multipart([client_addr, response]) clients.pop(client_addr) else: # 客户端不存在 print(worker_addr, client_addr) # 处理所有未处理的消息 while len(msg_cache) > 0 and len(workers) > 0: # 取出一个最近通信过的worker worker_addr, t = workers.popitem() # 判断是否心跳过期 过期则重新取worker if t - now > 1: continue msg = msg_cache.pop(0) # 转发缓存的消息 backend.send_multipart([worker_addr, msg[0], msg[1]]) # 接收前端消息 if frontend in socks and socks[frontend] == zmq.POLLIN: # 获取客户端地址和请求内容 ps: 此处的client_addr, request均是bytes类型 client_addr, request = frontend.recv_multipart() clients[client_addr] = 1 while len(workers) > 0: # 取出一个最近通信过的worker worker_addr, t = workers.popitem() # 判断是否心跳过期 过期则重新取worker if t - now > 1: continue # 转发消息 backend.send_multipart([worker_addr, client_addr, request]) break else: # while正常结束说明消息未被转发,存入缓存 msg_cache.append([client_addr, request]) 

worker.py

import zmq if __name__ == '__main__': context = zmq.Context() socket = context.socket(zmq.DEALER) # 设置接收消息超时时间为1秒 socket.setsockopt(zmq.RCVTIMEO, 1000) socket.connect("tcp://localhost:13000") # 发送心跳到broker注册worker socket.send_multipart([b"heart", b""]) while True: try: # 获取客户端地址和消息内容 client_addr, message = socket.recv_multipart() except Exception as e: # 超时 重新发送心跳 print(e) socket.send_multipart([b"heart", b""]) continue # 处理任务 print(client_addr, message) # 返回response socket.send_multipart([client_addr, b"world"]) 

总结:broker自带缓存消息队列,但是broker宕机就会丢失消息;worker主动连接broker并发送心跳包,但是worker接收到消息后宕机同样可能会丢失消息造成client阻塞。实现这个demo只是帮助理解分布式消息转发,不要直接使用。

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/215393.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午2:01
下一篇 2026年3月18日 下午2:02


相关推荐

  • IPHONE接口定义

    IPHONE接口定义苹果公司使用了一家名叫JAE公司的接插件,型号为DD1.这个接口有30针iphone接口定义英文版的:ThisconnectorisusedoniPod(startingfrom3rdgeneration)andiPhone.ItisusedtoconnecttheiPodoriPhonetovariousdevices:PC(viaUS

    2022年6月2日
    43
  • pycharm2021激活码_在线激活

    (pycharm2021激活码)好多小伙伴总是说激活码老是失效,太麻烦,关注/收藏全栈君太难教程,2021永久激活的方法等着你。https://javaforall.net/100143.htmlIntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,上面是详细链接哦~40Z9P7H9NN-eyJsaWNlbnNlSWQiOi…

    2022年3月28日
    104
  • Itext图层合并/分离

    Itext图层合并/分离技术要点 Itext 图层合并 图层分离合并 分离 最后技术要点 Itext 图层合并 图层分离合并 思路 1 图层来的是单独文件 需要使用 pdfwriter 写入 PdfLayerfoot newPdfLayer layerName1 writer dcb Begin

    2026年3月19日
    2
  • 使用Django 测试客户端一起测试视图,模板和URL

    使用Django 测试客户端一起测试视图,模板和URL

    2022年4月2日
    43
  • 自定义html下拉选择框,CSS自定义select下拉选择框的样式(不用其他标签模拟)

    自定义html下拉选择框,CSS自定义select下拉选择框的样式(不用其他标签模拟)今天群里有人问到怎么自定义 select 下拉选择框的样式 于是群里就展开了激烈的讨论 刚开始一直就是考虑怎样使用纯 CSS 实现 把浏览器默认的样式覆盖掉 但最后均因兼容问题处理不好而失败告终 最后的解决方案就是用其他的元素 如 ul li 模拟下拉菜单 或者是使用网上一些现成的插件 其实 select 这个东西只靠纯 CSS 是不能解决这个自定义样式问题的 但既然折腾了这么久 还是说一下 CSS 实现的思路吧 首

    2026年3月18日
    2
  • 一文搞懂 NULL 和 nullptr 的区别【C/C++面试必备】

    一文搞懂 NULL 和 nullptr 的区别【C/C++面试必备】作者 Linux 猿 CSDN 博客专家 Linux C C 面试 刷题 算法尽管咨询我 关注我 有问题私聊 大家可能对 NULL 和 nullptr 都有了解 NULL 最开始是在 C 语言中使用 后来 C 11 引入了 nullptr 为什么 C 11 要引入 nullptr 呢 那一定是 NULL 在某些方面存在哪些不足 所以引入了 nullptr 下面我们来看一下 本文使用的环境是 系统环境 Ubuntu20 04 开发工具 VisualStudio

    2026年3月19日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号