zmq的DEALER套接字是对REQ的一层包装,本质就是在发送数据前发送一段bytes数据(所以一个bytes数据就意味着一个连接),而ROUTER就是对REP的包装,是在接收数据前先接收bytes数据。REQ-REP可以看作bytes数据为空的DEALER-ROUTER模式,但是DEALER和ROUTER又不像REQ和REP一样严格遵循send-recv-send-recv…..的模式。
总体分为client-broker-worker三个部分,简化如下图所示。

broker的伪码流程图:

附代码:
client.py
import zmq ctx = zmq.Context.instance() socket = ctx.socket(zmq.DEALER) socket.connect("tcp://localhost:12000") if __name__ == '__main__': socket.send(b"hello") msg = socket.recv() print(msg)
broker.py
import zmq import time from collections import OrderedDict context = zmq.Context.instance() frontend = context.socket(zmq.ROUTER) frontend.bind("tcp://*:12000") backend = context.socket(zmq.ROUTER) backend.bind("tcp://*:13000") frontend.setsockopt(zmq.RCVHWM, 100) backend.setsockopt(zmq.RCVHWM, 100) workers = OrderedDict() clients = {} msg_cache = [] poll = zmq.Poller() poll.register(backend, zmq.POLLIN) poll.register(frontend, zmq.POLLIN) if __name__ == '__main__': while True: socks = dict(poll.poll(10)) now = time.time() # 接收后端消息 if backend in socks and socks[backend] == zmq.POLLIN: # 接收后端地址、客户端地址、后端返回response ps: 此处的worker_addr, client_addr, reply均是bytes类型 worker_addr, client_addr, response = backend.recv_multipart() # 把后端存入workers workers[worker_addr] = time.time() if client_addr in clients: # 如果客户端地址存在,把返回的response转发给客户端,并删除客户端 frontend.send_multipart([client_addr, response]) clients.pop(client_addr) else: # 客户端不存在 print(worker_addr, client_addr) # 处理所有未处理的消息 while len(msg_cache) > 0 and len(workers) > 0: # 取出一个最近通信过的worker worker_addr, t = workers.popitem() # 判断是否心跳过期 过期则重新取worker if t - now > 1: continue msg = msg_cache.pop(0) # 转发缓存的消息 backend.send_multipart([worker_addr, msg[0], msg[1]]) # 接收前端消息 if frontend in socks and socks[frontend] == zmq.POLLIN: # 获取客户端地址和请求内容 ps: 此处的client_addr, request均是bytes类型 client_addr, request = frontend.recv_multipart() clients[client_addr] = 1 while len(workers) > 0: # 取出一个最近通信过的worker worker_addr, t = workers.popitem() # 判断是否心跳过期 过期则重新取worker if t - now > 1: continue # 转发消息 backend.send_multipart([worker_addr, client_addr, request]) break else: # while正常结束说明消息未被转发,存入缓存 msg_cache.append([client_addr, request])
worker.py
import zmq if __name__ == '__main__': context = zmq.Context() socket = context.socket(zmq.DEALER) # 设置接收消息超时时间为1秒 socket.setsockopt(zmq.RCVTIMEO, 1000) socket.connect("tcp://localhost:13000") # 发送心跳到broker注册worker socket.send_multipart([b"heart", b""]) while True: try: # 获取客户端地址和消息内容 client_addr, message = socket.recv_multipart() except Exception as e: # 超时 重新发送心跳 print(e) socket.send_multipart([b"heart", b""]) continue # 处理任务 print(client_addr, message) # 返回response socket.send_multipart([client_addr, b"world"])
总结:broker自带缓存消息队列,但是broker宕机就会丢失消息;worker主动连接broker并发送心跳包,但是worker接收到消息后宕机同样可能会丢失消息造成client阻塞。实现这个demo只是帮助理解分布式消息转发,不要直接使用。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/215393.html原文链接:https://javaforall.net
