java 实现MQTT客户端

java 实现MQTT客户端简介 MQTT MessageQueui 消息队列遥测传输协议 是一种基于发布 订阅 publish subscribe 模式的 轻量级 通讯协议 该协议构建于 TCP IP 协议上 可以以极少的代码和有限的带宽 为连接远程设备提供实时可靠的消息服务 三种消息发布服务质量 至多一次 消息发布完全依赖底层 TCP IP 网络 会发生消息丢失或重复 至少一次 确保消息到达 但消息重复可能会发生 只有一次 确保消息到达一次 在一些要求比较严格的计费系统中

简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。

MQTT原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

(1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。

(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。

(3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

MQTT在Java中应用

MQTT的应用一般需要MQTT服务器,比如mosquito或EMQX服务软件,MQTT客户端和代理服务器可以使用代码实现。

Java客户端开发

<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> 
package com.t4cloud.t.sensor.handler; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @Slf4j public class DemoMqttClient { public static void main(String... args) { try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient mqttClient = new MqttClient("tcp://47.98.137.173:10001", "client", new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName("mu88KCb9"); // 设置密码 options.setPassword("J0x24wd066Y3q778".toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe("test"); // 设置回调 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { // 连接失败时调用 重新连接订阅 System.out.println("连接丢失............."); try { System.out.println("开始重连"); Thread.sleep(3000); mqttClient.connect(options); } catch (InterruptedException e) { e.printStackTrace(); } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); // 创建消息 MqttMessage message = new MqttMessage("hello World!".getBytes()); // 设置消息的服务质量 message.setQos(0); // 发布消息 mqttClient.publish("test", message); // 断开连接 mqttClient.disconnect(); // 关闭客户端 mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } } } 
如果你要在一个项目中启动多个客户端,那么可以多线程的方式创建

多线程:

package com.t4cloud.t.sensor.entity; import com.t4cloud.t.base.redis.topic.entity.RedisMsg; import com.t4cloud.t.base.utils.RedisTopicUtil; import com.t4cloud.t.sensor.constant.MqttClientManager; import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; //MQTT客户端线程 @Slf4j public class MqttClientThread extends Thread{ //连接地址 private String serverURL; //MQTT客户端登录用户名 private String mqttUsername; //MQTT客户端密码 private String mqttPassWord; //MQTT订阅主题 private String mqttTopic; //MQTT的client private String clientId; //产品id private String productId; //推送至我们自己的RedisTopIc中channel private String channel = "mqtt"; //mqtt实体类 private MqttClient mqttClient; //构造函数 public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) { this.serverURL = serverURL; this.mqttUsername = mqttUsername; this.mqttPassWord = mqttPassWord; this.mqttTopic = mqttTopic; this.clientId = clientId; this.productId = productId; } //线程方法 public void run(){ try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存,就用username mqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(mqttUsername); // 设置密码 options.setPassword(mqttPassWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 // options.setKeepAliveInterval(20); //设置断开后重新连接 options.setAutomaticReconnect(true); // 连接 mqttClient.connect(options); // 订阅 //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } // 设置回调 mqttClient.setCallback(new MqttCallbackExtended () { / * Called when the connection to the server is completed successfully. * * @param reconnect If true, the connection was the result of automatic reconnect. * @param serverURI The server URI that the connection was made to. */ @Override public void connectComplete(boolean reconnect, String serverURI) { try{ //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } log.info("----TAG", "connectComplete: 订阅主题成功"); }catch(Exception e){ e.printStackTrace(); log.info("----TAG", "error: 订阅主题失败"); } } @Override public void connectionLost(Throwable throwable) { log.error("连接断开,下面做重连..."); long reconnectTimes = 1; while (true) { try { if (mqttClient.isConnected()) { log.warn("mqtt reconnect success end"); break; } if(reconnectTimes == 10){ //当重连次数达到10次时,就抛出异常,不在重连 log.warn("mqtt reconnect error"); return; } log.warn("mqtt reconnect times = {} try again...", reconnectTimes++); mqttClient.reconnect(); } catch (MqttException e) { log.error("", e); } try { Thread.sleep(1000); } catch (InterruptedException e1) { // e1.printStackTrace(); } } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); //向我们通道中发送消息 RedisMsg redisMsg = new RedisMsg(); redisMsg.setChannel(channel); redisMsg.setMsg("推送MQTT消息"); SensorMqttMsg mqttMsg = new SensorMqttMsg(); mqttMsg.setProductId(productId); mqttMsg.setPayload(new String(mqttMessage.getPayload())); redisMsg.setData(mqttMsg); RedisTopicUtil.sendMessage(channel, redisMsg); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); //放入缓存,根据clinetId吧mqttClient对象放进去 MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient); } catch (Exception e) { e.printStackTrace(); //当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连 if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){ try { mqttClient.close(); } catch (MqttException ex) { ex.printStackTrace(); } } } } } 

这里面吧MqttClient放入了ConcurrentHashMap中,然后后面需要关闭的时候重ConcurrentHashMap拿出MqttClient对象使用如下代码关闭就可以了。

//断开连接 client.disconnect(); //关闭连接 client.close(); 

其次还解决了 MQTT断线重连及订阅消息恢复 问题。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233349.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • c++map set_get post区别

    c++map set_get post区别setset的数据结构set的操作函数erasefindcount重载操作符multisetmapmap的数据结构map的构造函数map的操作函数erasecleanfind重载运算符其他操作函数multimap)setset是一种关联式容器,其特性如下:set以RBTree作为底层容器所得元素的只有key没有value,value就是key不允许出现键值重复所有的元素都会被自动排…

    2025年9月26日
    5
  • python屏幕文字识别_python 图片文字识别 可截图识别

    python屏幕文字识别_python 图片文字识别 可截图识别[Python]纯文本查看复制代码importosfromaipimportAipOcrimportkeyboardfromPILimportImageGrabfromtimeimportsleepdefget_reuslt(img_name):a=input(‘是否添加可信度?(建议字多不加)(y/n):’)ifa==’y’:APP_ID=’xxxxxx…

    2022年5月27日
    76
  • 内核编程与应用程序开发的主要区别是_内核编程用什么语言

    内核编程与应用程序开发的主要区别是_内核编程用什么语言内核编程既不能访问C库也不能访问标准的C头文件。内核编程时必须使用GNUC。内核编程缺乏像用户空间那样的内存保护机制。内核编程时难以执行浮点数运算。内核给每个进程只有一个很小的定长堆栈。由于内核支持异步中断、抢占和SMR,因此必须时刻注意同步和并发。要考虑可移植性的重要性。…

    2022年10月8日
    3
  • FLAG_ACTIVITY_CLEAR_TOP:「建议收藏」

    FLAG_ACTIVITY_CLEAR_TOP:「建议收藏」FLAG_ACTIVITY_CLEAR_TOP:例如现在的栈情况为:ABCD。D此时通过intent跳转到B,如果这个intent添加FLAG_ACTIVITY_CLEAR_TOP标记,则栈情况变为:AB。如果没有添加这个标记,则栈情况将会变成:ABCDB。也就是说,如果添加了FLAG_ACTIVITY_CLEAR_TOP标记,并且目标Activity在栈中已经存在,则将会把

    2022年7月17日
    23
  • sbc 通信_ipc进程间通信

    sbc 通信_ipc进程间通信SBC在企业IP通信系统中的应用刘航2008/05/04  摘要:本文针对企业IP通信系统建设实施的两大问题:终端接入安全和IP多媒体业务NAT穿越,介绍了基于SBC(SessionBorderController,会话边界控制器)的解决方案,并提出了利用SBC辅助实现IP录音的一种新应用模式。  关键词:IP通信、SBC、NAT穿越、安全、IP录音一、引言

    2025年10月30日
    3
  • MP算法和OMP算法及其思想

    MP算法和OMP算法及其思想

    2021年11月28日
    39

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号