java 实现MQTT客户端

java 实现MQTT客户端简介 MQTT MessageQueui 消息队列遥测传输协议 是一种基于发布 订阅 publish subscribe 模式的 轻量级 通讯协议 该协议构建于 TCP IP 协议上 可以以极少的代码和有限的带宽 为连接远程设备提供实时可靠的消息服务 三种消息发布服务质量 至多一次 消息发布完全依赖底层 TCP IP 网络 会发生消息丢失或重复 至少一次 确保消息到达 但消息重复可能会发生 只有一次 确保消息到达一次 在一些要求比较严格的计费系统中

简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。

MQTT原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

(1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。

(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。

(3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

MQTT在Java中应用

MQTT的应用一般需要MQTT服务器,比如mosquito或EMQX服务软件,MQTT客户端和代理服务器可以使用代码实现。

Java客户端开发

<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> 
package com.t4cloud.t.sensor.handler; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @Slf4j public class DemoMqttClient { public static void main(String... args) { try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient mqttClient = new MqttClient("tcp://47.98.137.173:10001", "client", new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName("mu88KCb9"); // 设置密码 options.setPassword("J0x24wd066Y3q778".toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe("test"); // 设置回调 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { // 连接失败时调用 重新连接订阅 System.out.println("连接丢失............."); try { System.out.println("开始重连"); Thread.sleep(3000); mqttClient.connect(options); } catch (InterruptedException e) { e.printStackTrace(); } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); // 创建消息 MqttMessage message = new MqttMessage("hello World!".getBytes()); // 设置消息的服务质量 message.setQos(0); // 发布消息 mqttClient.publish("test", message); // 断开连接 mqttClient.disconnect(); // 关闭客户端 mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } } } 
如果你要在一个项目中启动多个客户端,那么可以多线程的方式创建

多线程:

package com.t4cloud.t.sensor.entity; import com.t4cloud.t.base.redis.topic.entity.RedisMsg; import com.t4cloud.t.base.utils.RedisTopicUtil; import com.t4cloud.t.sensor.constant.MqttClientManager; import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; //MQTT客户端线程 @Slf4j public class MqttClientThread extends Thread{ //连接地址 private String serverURL; //MQTT客户端登录用户名 private String mqttUsername; //MQTT客户端密码 private String mqttPassWord; //MQTT订阅主题 private String mqttTopic; //MQTT的client private String clientId; //产品id private String productId; //推送至我们自己的RedisTopIc中channel private String channel = "mqtt"; //mqtt实体类 private MqttClient mqttClient; //构造函数 public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) { this.serverURL = serverURL; this.mqttUsername = mqttUsername; this.mqttPassWord = mqttPassWord; this.mqttTopic = mqttTopic; this.clientId = clientId; this.productId = productId; } //线程方法 public void run(){ try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存,就用username mqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(mqttUsername); // 设置密码 options.setPassword(mqttPassWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 // options.setKeepAliveInterval(20); //设置断开后重新连接 options.setAutomaticReconnect(true); // 连接 mqttClient.connect(options); // 订阅 //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } // 设置回调 mqttClient.setCallback(new MqttCallbackExtended () { / * Called when the connection to the server is completed successfully. * * @param reconnect If true, the connection was the result of automatic reconnect. * @param serverURI The server URI that the connection was made to. */ @Override public void connectComplete(boolean reconnect, String serverURI) { try{ //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } log.info("----TAG", "connectComplete: 订阅主题成功"); }catch(Exception e){ e.printStackTrace(); log.info("----TAG", "error: 订阅主题失败"); } } @Override public void connectionLost(Throwable throwable) { log.error("连接断开,下面做重连..."); long reconnectTimes = 1; while (true) { try { if (mqttClient.isConnected()) { log.warn("mqtt reconnect success end"); break; } if(reconnectTimes == 10){ //当重连次数达到10次时,就抛出异常,不在重连 log.warn("mqtt reconnect error"); return; } log.warn("mqtt reconnect times = {} try again...", reconnectTimes++); mqttClient.reconnect(); } catch (MqttException e) { log.error("", e); } try { Thread.sleep(1000); } catch (InterruptedException e1) { // e1.printStackTrace(); } } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); //向我们通道中发送消息 RedisMsg redisMsg = new RedisMsg(); redisMsg.setChannel(channel); redisMsg.setMsg("推送MQTT消息"); SensorMqttMsg mqttMsg = new SensorMqttMsg(); mqttMsg.setProductId(productId); mqttMsg.setPayload(new String(mqttMessage.getPayload())); redisMsg.setData(mqttMsg); RedisTopicUtil.sendMessage(channel, redisMsg); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); //放入缓存,根据clinetId吧mqttClient对象放进去 MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient); } catch (Exception e) { e.printStackTrace(); //当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连 if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){ try { mqttClient.close(); } catch (MqttException ex) { ex.printStackTrace(); } } } } } 

这里面吧MqttClient放入了ConcurrentHashMap中,然后后面需要关闭的时候重ConcurrentHashMap拿出MqttClient对象使用如下代码关闭就可以了。

//断开连接 client.disconnect(); //关闭连接 client.close(); 

其次还解决了 MQTT断线重连及订阅消息恢复 问题。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233349.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • subscriptions_promise sb to do

    subscriptions_promise sb to do1.Promise的含义Promise是异步编程的一种解决方案,比传统的解决方案——回调函数和事件——更合理和更强大。它由社区最早提出和实现,ES6将其写进了语言标准,统一了用法,原生提供了P

    2022年8月7日
    5
  • CPU流水线详解_多周期流水线cpu

    CPU流水线详解_多周期流水线cpu为什么Intel处理器主频这么高,而AMD处理器主频都很低?是不是AMD处理器性能不如Intel?我们一般的回答都是,因为Intel处理器与AMD处理器内部构架不同,所以导致了这种情况,还有一种具体一点的回答就是因为Intel处理器流水线长,那到底流水线与CPU主频具体有什么关系呢?今天给大家带来一篇我以前刊登在《电脑报》硬件板块技术大讲堂版面的一篇原创文章。关于CPU流水线的知识,很多报纸杂

    2022年8月20日
    24
  • 获取当前jar包路径_java获取jar文件

    获取当前jar包路径_java获取jar文件一、获取可执行jar包所在目录(1)方法一:使用System.getProperty(“java.class.path”)获取classpath的路径,若没有其他依赖,在cmd下运行该可执行jar包,则该值即为该jar包的绝对路径。代码如下:/***方法一:获取当前可执行jar包所在目录*/StringfilePath=System.getProperty(“java.class.

    2022年9月27日
    3
  • ftp服务器文件保存位置,ftp服务器和文件保存路径「建议收藏」

    ftp服务器文件保存位置,ftp服务器和文件保存路径「建议收藏」ftp服务器和文件保存路径内容精选换一换用户可以在MRSManager界面上配置监控指标数据对接参数,使集群内各监控指标数据通过FTP或SFTP协议保存到指定的FTP服务器,与第三方系统进行对接。FTP协议未加密数据可能存在安全风险,建议使用SFTP。MRSManager支持采集当前管理的集群内所有监控指标数据,采集的周期有30秒、60秒和300秒三种。监控指标数据在FTP该任务指导用户使用…

    2025年6月2日
    3
  • java垃圾回收器的工作原理「建议收藏」

    java垃圾回收器的工作原理「建议收藏」出处:Sunnier深入理解java垃圾回收机制—-一、垃圾回收机制的意义  Java语言中一个显著的特点就是引入了垃圾回收机制,使c++程序员最头疼的内存管理的问题迎刃而解,它使得Java程序员在编写程序的时候不再需要考虑内存管理。由于有个垃圾回收机制,Java中的对象不再有“作用域”的概念,只有对象的引用才有“作用域”。垃圾回收可以有效的防止内存泄露,有效的使用

    2022年10月8日
    4
  • 笔记《Thinking in Java》第2章 一切都是对象[通俗易懂]

    笔记《Thinking in Java》第2章 一切都是对象[通俗易懂]第二章一切都是对象1.数据可以存在哪寄存器。因为它在CPU内部,所以最快。但是Java无法直接控制它。栈。在RAM上,但是,CPU可以通过栈指针快速的分配存储,向下就分配新内存,向上就释放内存,所以速度很快。代价是,Java系统必须确切的知道数据在栈里的生命周期,所以灵活性有限。Java的对象引用存在这。堆。也是在RAM上,不过跟栈比,编译器不用知道数据在堆里的生命周期,所以在堆里分配…

    2022年7月8日
    23

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号