分布式进阶(十五)ZMQ

分布式进阶(十五)ZMQ我们为什么需要 ZMQ 目前的应用程序很多都会包含跨网络的组件 无论是局域网还是因特网 这些程序的开发者都会用到某种消息通信机制 有些人会使用某种消息队列产品 而大多数人则会自己手工来做这些事 使用 TCP 或 UDP 协议 这些协议使用起来并不困难 但是 简单地将消息从 A 发给 B 和在任何情况下都能进行可靠的消息传输 这两种情况显然是不同的 让我们看看在使用纯 TCP 协议进行消息传输时会遇到的一些典型问

一、我们为什么需要ZMQ

目前的应用程序很多都会包含跨网络的组件,无论是局域网还是因特网。这些程序的开发者都会用到某种消息通信机制。有些人会使用某种消息队列产品,而大多数人则会自己手工来做这些事,使用TCP或UDP协议。这些协议使用起来并不困难,但是,简单地将消息从A发给B,和在任何情况下都能进行可靠的消息传输,这两种情况显然是不同的。

首先看看在使用纯TCP协议进行消息传输时会遇到的一些典型问题。任何可复用的消息传输层肯定或多或少地会要解决以下问题:

  • 如何处理I/O?是让程序阻塞等待响应,还是在后台处理这些事?这是软件设计的关键因素。阻塞式的I/O操作会让程序架构难以扩展,而后台处理I/O也是比较困难的。
  • 如何处理那些临时的、来去自由的组件?我们是否要将组件分为客户端和服务端两种,并要求服务端永不消失?那如果我们想要将服务端相连怎么办?我们要每隔几秒就进行重连吗?
  • 如何表示一条消息?我们怎样通过拆分消息,让其变得易读易写,不用担心缓存溢出,既能高效地传输小消息,又能胜任视频等大型文件的传输?
  • 如何处理那些不能立刻发送出去的消息?比如我们需要等待一个网络组件重新连接的时候?我们是直接丢弃该条消息,还是将它存入数据库,或是内存中的一个队列?
  • 要在哪里保存消息队列?如果某个组件读取消息队列的速度很慢,造成消息的堆积怎么办?我们要采取什么样的策略?
  • 如何处理丢失的消息?我们是等待新的数据,请求重发,还是需要建立一套新的可靠性机制以保证消息不会丢失?如果这个机制自身崩溃了呢?
  • 如果想换一种网络连接协议,如用广播代替TCP单播?或者改用IPv6?我们是否需要重写所有的应用程序,或者将这种协议抽象到一个单独的层中?
  • 如何对消息进行路由?我们可以将消息同时发送给多个节点吗?是否能将应答消息返回给请求的发送方?
  • 如何为另一种语言写一个API?我们是否需要完全重写某项协议,还是重新打包一个类库?
  • 怎样才能做到在不同的架构之间传送消息?是否需要为消息规定一种编码?
  • 如何处理网络通信错误?等待并重试,还是直接忽略或取消?

ZMQ就是这样一种软件:它高效,提供了嵌入式的类库,使应用程序能够很好地在网络中扩展,成本低廉。

ZMQ的主要特点有:

  • ZMQ会在后台线程异步地处理I/O操作,它使用一种不会死锁的数据结构来存储消息。
  • 网络组件可以来去自如,ZMQ会负责自动重连,这就意味着你可以以任何顺序启动组件;用它创建的面向服务架构(SOA)中,服务端可以随意地加入或退出网络。
  • ZMQ会在有必要的情况下自动将消息放入队列中保存,一旦建立了连接就开始发送。
  • ZMQ有阈值(HWM)的机制,可以避免消息溢出。当队列已满,ZMQ会自动阻塞发送者,或丢弃部分消息,这些行为取决于你所使用的消息模式。
  • ZMQ可以让你用不同的通信协议进行连接,如TCP、广播、进程内、进程间。改变通信协议时你不需要去修改代码。
  • ZMQ会恰当地处理速度较慢的节点,会根据消息模式使用不同的策略。
  • ZMQ提供了多种模式进行消息路由,如请求-应答模式、发布-订阅模式等。这些模式可以用来搭建网络拓扑结构。
  • ZMQ中可以根据消息模式建立起一些中间装置(很小巧),可以用来降低网络的复杂程度。
  • ZMQ会发送整个消息,使用消息帧的机制来传递。如果你发送了10KB大小的消息,你就会收到10KB大小的消息。
  • ZMQ不强制使用某种消息格式,消息可以是0字节的,或是大到GB级的数据。当你表示这些消息时,可以选用诸如谷歌的protocol buffers,XDR等序列化产品。
  • ZMQ能够智能地处理网络错误,有时它会进行重试,有时会告知你某项操作发生了错误。
  • ZMQ甚至可以降低对环境的污染,因为节省了CPU时间意味着节省了电能。

其实ZMQ可以做的还不止这些,它会颠覆人们编写网络应用程序的模式。虽然从表面上看,它不过是提供了一套处理套接字的API,能够用zmq_recv()和zmq_send()进行消息的收发,但是,消息处理将成为应用程序的核心部分,很快你的程序就会变成一个个消息处理模块,这既美观又自然。它的扩展性还很强,每项任务由一个节点(节点是一个线程)、同一台机器上的两个节点(节点是一个进程)、同一网络上的两台机器(节点是一台机器)来处理,而不需要改动应用程序。

二、ZeroMQ 背景介绍

官方:“ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”

与其他消息中间件相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。

三、ZMQ是什么

阅读了ZMQ的Guide文档后,这是个类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。

四、三种模型

4.1 应答模式

使用REQ-REP套接字发送和接受消息是需要遵循一定规律的。客户端首先使用zmq_send()发送消息,再用zmq_recv()接收,如此循环。如果打乱了这个顺序(如连续发送两次)则会报错。类似地,服务端必须先进行接收,后进行发送。

4.2 订阅发布模式

PUB-SUB套接字组合是异步的。客户端在一个循环体中使用recv ()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用send ()发送消息,但不能在PUB套接字上使用recv ()。

关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。解决此问题需要在PUB端加入sleep。

4.3 基于分布式处理(管道模式)

下面贴出PUB_SUB(应答模式)模式下的代码:

发布端:

package cn.edu.ujn.pub_sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket;   / * Pubsub envelope publisher */ public class psenvpub {     public static void main (String[] args) throws Exception {         // Prepare our context and publisher     Context context = ZMQ.context(1);     Socket publisher = context.socket(ZMQ.PUB);         publisher.bind("tcp://*:5563");         while (!Thread.currentThread ().isInterrupted ()) {             // Write two messages, each with an envelope and content             publisher.sendMore ("A");             publisher.send ("We don't want to see this");             publisher.sendMore ("B");             publisher.send("We would like to see this");         }         publisher.close ();         context.term ();     } }

发布端需要通过context.socket(ZMQ.PUB)表示为发布端,通过bind方法来创建发布端连接,等待订阅者连接。

之后通过send方法将数据发送到出去。

之后来写订阅端代码

package cn.edu.ujn.pub_sub; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; / * Pubsub envelope subscriber */ public class psenvsub {     public static void main (String[] args) {         // Prepare our context and subscriber         Context context = ZMQ.context(1);         Socket subscriber = context.socket(ZMQ.SUB);         subscriber.connect("tcp://localhost:5563");         subscriber.subscribe("B".getBytes());         while (!Thread.currentThread ().isInterrupted ()) {             // Read envelope with address             String address = subscriber.recvStr ();             // Read message contents             String contents = subscriber.recvStr ();             System.out.println(address + " : " + contents);         }         subscriber.close ();         context.term ();     } }

客户端通过connect进行连接,之后通过recv来进行数据接收。

下面贴出REQ_REP(订阅发布模式)模式下的代码:

发送端:

package cn.edu.ujn.req_rep; // //  Hello World server in Java //  Binds REP socket to tcp://*:5555 //  Expects "Hello" from client, replies with "World" // import org.zeromq.ZMQ; public class hwserver { private static int i = 0;     public static void main(String[] args) throws Exception {         ZMQ.Context context = ZMQ.context(1);         //  Socket to talk to clients         ZMQ.Socket responder = context.socket(ZMQ.REP);         responder.bind("tcp://*:5555");           while (!Thread.currentThread().isInterrupted()) {             // Wait for next request from the client             byte[] request = responder.recv(0);                          System.out.println("Received " + new String(request) + i++);               // Do some 'work'             Thread.sleep(1000);             // Send reply back to client             String reply = "World";             responder.send(reply.getBytes(), 0);         }         responder.close();         context.term();     } }

接收端:

package cn.edu.ujn.req_rep; //  Hello World client in Java //  Connects REQ socket to tcp://localhost:5555 //  Sends "Hello" to server, expects "World" back import org.zeromq.ZMQ; public class hwclient {     public static void main(String[] args) {         ZMQ.Context context = ZMQ.context(1);         //  Socket to talk to server         System.out.println("Connecting to hello world server…");         ZMQ.Socket requester = context.socket(ZMQ.REQ);         requester.connect("tcp://localhost:5555");         for (int requestNbr = 0; requestNbr != 10; requestNbr++) {             String request = "Hello";             System.out.println("Sending Hello " + requestNbr);             requester.send(request.getBytes(), 0);             byte[] reply = requester.recv(0);             System.out.println("Received " + new String(reply) + " " + requestNbr);         }         requester.close();         context.term();     } }

下面贴出Para_Pipe(基于分布式处理(管道模式))模式下的代码:

发送端:

package cn.edu.ujn.para_pipe; import java.util.Random; import org.zeromq.ZMQ; //  Task ventilator in Java //  Binds PUSH socket to tcp://localhost:5557 //  Sends batch of tasks to workers via that socket public class taskvent {     public static void main (String[] args) throws Exception {         ZMQ.Context context = ZMQ.context(1);         //  Socket to send messages on         ZMQ.Socket sender = context.socket(ZMQ.PUSH);         sender.bind("tcp://*:5557");         //  Socket to send messages on         ZMQ.Socket sink = context.socket(ZMQ.PUSH);         sink.connect("tcp://localhost:5558");         System.out.println("Press Enter when the workers are ready: ");         System.in.read();         System.out.println("Sending tasks to workers\n");           //  The first message is "0" and signals start of batch         sink.send("0", 0);           //  Initialize random number generator         Random srandom = new Random(System.currentTimeMillis());           //  Send 100 tasks         int task_nbr;         int total_msec = 0;     //  Total expected cost in msecs         for (task_nbr = 0; task_nbr < 100; task_nbr++) {             int workload;             //  Random workload from 1 to 100msecs             workload = srandom.nextInt(100) + 1;             total_msec += workload;             System.out.print(workload + ".");             String string = String.format("%d", workload);             sender.send(string, 0);         }         System.out.println("Total expected cost: " + total_msec + " msec");         Thread.sleep(1000);              //  Give 0MQ time to deliver           sink.close();         sender.close();         context.term();     } }

中介端:

package cn.edu.ujn.para_pipe; import org.zeromq.ZMQ; //  Task worker in Java //  Connects PULL socket to tcp://localhost:5557 //  Collects workloads from ventilator via that socket //  Connects PUSH socket to tcp://localhost:5558 //  Sends results to sink via that socket public class taskwork {     public static void main (String[] args) throws Exception {         ZMQ.Context context = ZMQ.context(1);         //  Socket to receive messages on         ZMQ.Socket receiver = context.socket(ZMQ.PULL);         receiver.connect("tcp://localhost:5557");         //  Socket to send messages to         ZMQ.Socket sender = context.socket(ZMQ.PUSH);         sender.connect("tcp://localhost:5558");         //  Process tasks forever         while (!Thread.currentThread ().isInterrupted ()) {             String string = new String(receiver.recv(0)).trim();             long msec = Long.parseLong(string);             //  Simple progress indicator for the viewer             System.out.flush();             System.out.print(string + '.');             //  Do the work             Thread.sleep(msec);             //  Send results to sink             sender.send("".getBytes(), 0);         }         sender.close();         receiver.close();         context.term();     } }

接收端:

package cn.edu.ujn.para_pipe; import org.zeromq.ZMQ; //  Task sink in Java //  Binds PULL socket to tcp://localhost:5558 //  Collects results from workers via that socket public class tasksink {     public static void main (String[] args) throws Exception {         //  Prepare our context and socket         ZMQ.Context context = ZMQ.context(1);         ZMQ.Socket receiver = context.socket(ZMQ.PULL);         receiver.bind("tcp://*:5558");         //  Wait for start of batch         String string = new String(receiver.recv(0));         //  Start our clock now         long tstart = System.currentTimeMillis();         //  Process 100 confirmations         int task_nbr;         int total_msec = 0;     //  Total calculated cost in msecs         for (task_nbr = 0; task_nbr < 100; task_nbr++) {             string = new String(receiver.recv(0)).trim();             if ((task_nbr / 10) * 10 == task_nbr) {                 System.out.print(":");             } else {                 System.out.print(".");             }         }         //  Calculate and report duration of batch         long tend = System.currentTimeMillis();         System.out.println("\nTotal elapsed time: " + (tend - tstart) + " msec");         receiver.close();         context.term();     } }

五、注意事项

5.1 正确地使用上下文

ZMQ应用程序的一开始总是会先创建一个上下文,并用它来创建套接字。在C语言中,创建上下文的函数是zmq_init()。一个进程中只应该创建一个上下文。从技术的角度来说,上下文是一个容器,包含了该进程下所有的套接字,并为inproc协议提供实现,用以高速连接进程内不同的线程。

如果一个进程中创建了两个上下文,那就相当于启动了两个ZMQ实例。如果这正是你需要的,那没有问题,但一般情况下:在一个进程中使用zmq_init()函数创建一个上下文,并在结束时使用zmq_term()函数关闭它。

如果你使用了fork()系统调用,那每个进程需要自己的上下文对象。如果在调用fork()之前调用了zmq_init()函数,那每个子进程都会有自己的上下文对象。通常情况下,你会需要在子进程中做些有趣的事,而让父进程来管理它们。

5.2 正确地退出和清理

程序员的一个良好习惯是:总是在结束时进行清理工作。当你使用像Python那样的语言编写ZMQ应用程序时,系统会自动帮你完成清理。但如果使用的是C语言,那就需要小心地处理了,否则可能发生内存泄露、应用程序不稳定等问题。

内存泄露只是问题之一,其实ZMQ是很在意程序的退出方式的。个中原因比较复杂,但简单的来说,如果仍有套接字处于打开状态,调用zmq_term()时会导致程序挂起;就算关闭了所有的套接字,如果仍有消息处于待发送状态,zmq_term()也会造成程序的等待。只有当套接字的LINGER选项设为0时才能避免。

我们需要关注的ZMQ对象包括:消息、套接字、上下文。好在内容并不多,至少在一般的应用程序中是这样:

  • 处理完消息后,记得用zmq_msg_close()函数关闭消息;
  • 如果你同时打开或关闭了很多套接字,那可能需要重新规划一下程序的结构了;
  • 退出程序时,应该先关闭所有的套接字,最后调用zmq_term()函数,销毁上下文对象。

5.3 你的想法可能会被颠覆

传统网络编程的一个规则是套接字只能和一个节点建立连接。虽然也有广播的协议,但毕竟是第三方的。当我们认定“一个套接字 = 一个连接”的时候,我们会用一些特定的方式来扩展应用程序架构:我们为每一块逻辑创建线程,该线程独立地维护一个套接字。

但在ZMQ的世界里,套接字是智能的、多线程的,能够自动地维护一组完整的连接。你无法看到它们,甚至不能直接操纵这些连接。当你进行消息的收发、轮询等操作时,只能和ZMQ套接字打交道,而不是连接本身。所以说,ZMQ世界里的连接是私有的,不对外部开放,这也是ZMQ易于扩展的原因之一。

由于你的代码只会和某个套接字进行通信,这样就可以处理任意多个连接,使用任意一种网络协议。而ZMQ的消息模式又可以进行更为廉价和便捷的扩展。

这样一来,传统的思维就无法在ZMQ的世界里应用了。在你阅读示例程序代码的时候,也许你脑子里会想方设法地将这些代码和传统的网络编程相关联:当你读到“套接字”的时候,会认为它就表示与另一个节点的连接——这种想法是错误的;当你读到“线程”时,会认为它是与另一个节点的连接——这也是错误的。

如果你是第一次阅读本指南,使用ZMQ进行了一两天的开发(或者更长),可能会觉得疑惑,ZMQ怎么会让事情便得如此简单。你再次尝试用以往的思维去理解ZMQ,但又无功而返。最后,你会被ZMQ的理念所折服,拨云见雾,开始享受ZMQ带来的乐趣。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/200253.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月20日 上午11:13
下一篇 2026年3月20日 上午11:13


相关推荐

  • SE是什么意思_pe是什么的英文简称

    SE是什么意思_pe是什么的英文简称AEApplicationEngineer应用工程师。定位:IC流片后,需要在通用应用系统(比如Intel/AMD主板)或者关键刻画的系统平台上进行功能验证,发现问题反馈给IC设计工程师。与FAE相比,AE偏向IC设计,FAE偏向市场对一点。FAEFieldAppilcationEngineer现场应用工程师,又称现场应用技术支持工程师。定位:IC产品在客户端送样时,可能出现技术问题,协助客户的工程技术人员解决技术问题;协助市场人员,从技术角度推广产品,开拓新客户,收集客户的技术问题与

    2025年8月1日
    3
  • minicom指令_如何优雅地使用minicom

    minicom指令_如何优雅地使用minicomminicom简介安装minicom是linux下一款常用的串口调试工具。ubuntu环境下,使用如下命令安装sudoapt-getinstallminicom配置使用前需要进行配置,执行sudominicom-s可打开minicom并进入配置模式,使用方向键,选择需要配置的项目,如Serialportsetup,回车进入配置,可以看到多个配置项,此时光标在最下方。需要修改某个配…

    2022年6月8日
    49
  • SecureCRT强制卸载

    SecureCRT强制卸载SecureCRT强制卸载

    2022年4月24日
    80
  • java基础-云服务器购买

    java基础-云服务器购买小伙伴们,你们好呀!我是老寇!3年前,在阿里云买了我人生中的第一台云服务器,二话没说直接下单,看着支付宝的余额,我心如刀绞。所幸的是我熬过了这一个月。接下来我们进入正题(以阿里云为例)!目录一、操作步骤一、操作步骤1.输入阿里云网址,点击账号登录2.扫码登录->强烈建议下个阿里云APP,这样每次登陆只需要扫一扫就可以3.点击控制台,进入控制台4.完成实名认证(略)5.点击最新活动,找到开发者成长计划6.认准ECS服务器7.买Cento

    2022年5月5日
    43
  • 决策树原理解析_解析的原理

    决策树原理解析_解析的原理决策树原理解析1.决策树算法以及基本流程决策树是基于树结构进行决策的,其机制就是通过判定每个属性分类的纯度来进行自上而下决策分类决策树包含根结点,内部结点,叶结点;根结点和内部结点对应与分类的属性(也就是分类的基准),叶结点对应决策结果(也就是纯度很高且不需要继续分裂的类别);从根结点到某一个叶结点的路径便是当前叶结点对应类的整个决策过程,下面来看决策树的算法流程:可以看到决…

    2025年10月6日
    5
  • 讯飞星火xDeepSeek双大模型合体!讯飞智能办公本X3新功能首秀!

    讯飞星火xDeepSeek双大模型合体!讯飞智能办公本X3新功能首秀!

    2026年3月14日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号