java 实现MQTT客户端

java 实现MQTT客户端简介 MQTT MessageQueui 消息队列遥测传输协议 是一种基于发布 订阅 publish subscribe 模式的 轻量级 通讯协议 该协议构建于 TCP IP 协议上 可以以极少的代码和有限的带宽 为连接远程设备提供实时可靠的消息服务 三种消息发布服务质量 至多一次 消息发布完全依赖底层 TCP IP 网络 会发生消息丢失或重复 至少一次 确保消息到达 但消息重复可能会发生 只有一次 确保消息到达一次 在一些要求比较严格的计费系统中

简介

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。

MQTT原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

(1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。

(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。

(3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

MQTT在Java中应用

MQTT的应用一般需要MQTT服务器,比如mosquito或EMQX服务软件,MQTT客户端和代理服务器可以使用代码实现。

Java客户端开发

<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> 
package com.t4cloud.t.sensor.handler; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @Slf4j public class DemoMqttClient { public static void main(String... args) { try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient mqttClient = new MqttClient("tcp://47.98.137.173:10001", "client", new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName("mu88KCb9"); // 设置密码 options.setPassword("J0x24wd066Y3q778".toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe("test"); // 设置回调 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { // 连接失败时调用 重新连接订阅 System.out.println("连接丢失............."); try { System.out.println("开始重连"); Thread.sleep(3000); mqttClient.connect(options); } catch (InterruptedException e) { e.printStackTrace(); } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); // 创建消息 MqttMessage message = new MqttMessage("hello World!".getBytes()); // 设置消息的服务质量 message.setQos(0); // 发布消息 mqttClient.publish("test", message); // 断开连接 mqttClient.disconnect(); // 关闭客户端 mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } } } 
如果你要在一个项目中启动多个客户端,那么可以多线程的方式创建

多线程:

package com.t4cloud.t.sensor.entity; import com.t4cloud.t.base.redis.topic.entity.RedisMsg; import com.t4cloud.t.base.utils.RedisTopicUtil; import com.t4cloud.t.sensor.constant.MqttClientManager; import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; //MQTT客户端线程 @Slf4j public class MqttClientThread extends Thread{ //连接地址 private String serverURL; //MQTT客户端登录用户名 private String mqttUsername; //MQTT客户端密码 private String mqttPassWord; //MQTT订阅主题 private String mqttTopic; //MQTT的client private String clientId; //产品id private String productId; //推送至我们自己的RedisTopIc中channel private String channel = "mqtt"; //mqtt实体类 private MqttClient mqttClient; //构造函数 public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) { this.serverURL = serverURL; this.mqttUsername = mqttUsername; this.mqttPassWord = mqttPassWord; this.mqttTopic = mqttTopic; this.clientId = clientId; this.productId = productId; } //线程方法 public void run(){ try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存,就用username mqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(mqttUsername); // 设置密码 options.setPassword(mqttPassWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 // options.setKeepAliveInterval(20); //设置断开后重新连接 options.setAutomaticReconnect(true); // 连接 mqttClient.connect(options); // 订阅 //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } // 设置回调 mqttClient.setCallback(new MqttCallbackExtended () { / * Called when the connection to the server is completed successfully. * * @param reconnect If true, the connection was the result of automatic reconnect. * @param serverURI The server URI that the connection was made to. */ @Override public void connectComplete(boolean reconnect, String serverURI) { try{ //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } log.info("----TAG", "connectComplete: 订阅主题成功"); }catch(Exception e){ e.printStackTrace(); log.info("----TAG", "error: 订阅主题失败"); } } @Override public void connectionLost(Throwable throwable) { log.error("连接断开,下面做重连..."); long reconnectTimes = 1; while (true) { try { if (mqttClient.isConnected()) { log.warn("mqtt reconnect success end"); break; } if(reconnectTimes == 10){ //当重连次数达到10次时,就抛出异常,不在重连 log.warn("mqtt reconnect error"); return; } log.warn("mqtt reconnect times = {} try again...", reconnectTimes++); mqttClient.reconnect(); } catch (MqttException e) { log.error("", e); } try { Thread.sleep(1000); } catch (InterruptedException e1) { // e1.printStackTrace(); } } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); //向我们通道中发送消息 RedisMsg redisMsg = new RedisMsg(); redisMsg.setChannel(channel); redisMsg.setMsg("推送MQTT消息"); SensorMqttMsg mqttMsg = new SensorMqttMsg(); mqttMsg.setProductId(productId); mqttMsg.setPayload(new String(mqttMessage.getPayload())); redisMsg.setData(mqttMsg); RedisTopicUtil.sendMessage(channel, redisMsg); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); //放入缓存,根据clinetId吧mqttClient对象放进去 MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient); } catch (Exception e) { e.printStackTrace(); //当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连 if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){ try { mqttClient.close(); } catch (MqttException ex) { ex.printStackTrace(); } } } } } 

这里面吧MqttClient放入了ConcurrentHashMap中,然后后面需要关闭的时候重ConcurrentHashMap拿出MqttClient对象使用如下代码关闭就可以了。

//断开连接 client.disconnect(); //关闭连接 client.close(); 

其次还解决了 MQTT断线重连及订阅消息恢复 问题。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233349.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • spring框架常用的注解_springmvc中注解的作用

    spring框架常用的注解_springmvc中注解的作用一、Spring常用注解Spring常用注解(绝对经典)二、Jpa1、@Entity,@Table(name=””)表明这是一个实体类,一般用于jpa,这两个注解一块使用,但是如果表名和实体类名相同的话,@Table可以省略。2、@MappedSuperClass基于代码复用和模型分离的思想,在项目开发中使用jpa的@MappedSuperClass注解,将实体类的多个属性分别封装到不同的非实体类中。例如,数据库表中都需要id来表示编号,id是这些映射实体类的通用属性,交给jpa统

    2022年10月22日
    0
  • 转载:Visio2013的密钥「建议收藏」

    转载:Visio2013的密钥「建议收藏」因为新买的电脑安装visio后之前的注册机都用不了,只能从网上找密钥激活,以下是转载网上的,有需要者自行尝试把VisioProfessional2013KEYC2FG9-N6J68-H8BTJ-BW3QX-RM3B32NYF6-QG2CY-9F8XC-GWMBW-29VV8FJ2N7-W8TXC-JB8KB-DCQ7Q-7T7V3VXX6C-DN3HQ-3CRXG-RF4KT-YG7V3B3…

    2022年6月24日
    69
  • H.264/MPEG-4 AVC学习

    H.264/MPEG-4 AVC学习转自:https://www.freehacker.cn/media/codec-h264/简述H.264,又称为MPEG-4第10部分,高级视频编码(英语:MPEG-4Part10,AdvancedVideoCoding,缩写为MPEG-4AVC)是一种面向块的基于运动补偿的视频编码标准。对于视频序列样本来说,使用H.264编码器能够比使用有运动补偿的MPEG-…

    2022年9月19日
    0
  • Linux查看java进程路径

    Linux查看java进程路径1.找到进程IDps-ef|grepjava2.进入进程目录cd/proc/进程ID3.查看cwd路径找到cwd对应的路径就是jar包的路径以下是我的操作截图

    2022年8月24日
    7
  • portraiture 3 for mac(PS人像磨皮滤镜插件)激活成功教程教程

    portraiture 3 for mac(PS人像磨皮滤镜插件)激活成功教程教程Portraiture3forMac是PS中优秀的人像磨皮滤镜插件,portraiture3mac激活成功教程版可以对皮肤、头发、眉毛、睫毛等部位进行磨皮润色处理,还能自由调整锐度、柔软度、亮度、对比度等,这里为大家带来portraiture滤镜的激活成功教程教程,赶紧来看看吧!portraiture激活成功教程步骤下载好Portraiture安装包后,点击打开Portraiture.dmg,双击【…

    2022年7月22日
    8
  • source insight 3.5注册码

    source insight 3.5注册码SI3US-631710-93181

    2022年7月3日
    22

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号