简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。
MQTT原理
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:
(1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
(3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。
MQTT在Java中应用
MQTT的应用一般需要MQTT服务器,比如mosquito或EMQX服务软件,MQTT客户端和代理服务器可以使用代码实现。
Java客户端开发
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
package com.t4cloud.t.sensor.handler; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @Slf4j public class DemoMqttClient { public static void main(String... args) { try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient mqttClient = new MqttClient("tcp://47.98.137.173:10001", "client", new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName("mu88KCb9"); // 设置密码 options.setPassword("J0x24wd066Y3q778".toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe("test"); // 设置回调 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { // 连接失败时调用 重新连接订阅 System.out.println("连接丢失............."); try { System.out.println("开始重连"); Thread.sleep(3000); mqttClient.connect(options); } catch (InterruptedException e) { e.printStackTrace(); } catch (MqttSecurityException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); // 创建消息 MqttMessage message = new MqttMessage("hello World!".getBytes()); // 设置消息的服务质量 message.setQos(0); // 发布消息 mqttClient.publish("test", message); // 断开连接 mqttClient.disconnect(); // 关闭客户端 mqttClient.close(); } catch (Exception e) { e.printStackTrace(); } } }
如果你要在一个项目中启动多个客户端,那么可以多线程的方式创建
多线程:
package com.t4cloud.t.sensor.entity; import com.t4cloud.t.base.redis.topic.entity.RedisMsg; import com.t4cloud.t.base.utils.RedisTopicUtil; import com.t4cloud.t.sensor.constant.MqttClientManager; import com.t4cloud.t.sensor.entity.vo.SensorMqttMsg; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; //MQTT客户端线程 @Slf4j public class MqttClientThread extends Thread{ //连接地址 private String serverURL; //MQTT客户端登录用户名 private String mqttUsername; //MQTT客户端密码 private String mqttPassWord; //MQTT订阅主题 private String mqttTopic; //MQTT的client private String clientId; //产品id private String productId; //推送至我们自己的RedisTopIc中channel private String channel = "mqtt"; //mqtt实体类 private MqttClient mqttClient; //构造函数 public MqttClientThread(String serverURL,String mqttUsername,String mqttPassWord,String mqttTopic,String clientId,String productId) { this.serverURL = serverURL; this.mqttUsername = mqttUsername; this.mqttPassWord = mqttPassWord; this.mqttTopic = mqttTopic; this.clientId = clientId; this.productId = productId; } //线程方法 public void run(){ try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存,就用username mqttClient = new MqttClient(serverURL, clientId, new MemoryPersistence()); // 配置参数信息 MqttConnectOptions options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(mqttUsername); // 设置密码 options.setPassword(mqttPassWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 // options.setKeepAliveInterval(20); //设置断开后重新连接 options.setAutomaticReconnect(true); // 连接 mqttClient.connect(options); // 订阅 //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } // 设置回调 mqttClient.setCallback(new MqttCallbackExtended () { / * Called when the connection to the server is completed successfully. * * @param reconnect If true, the connection was the result of automatic reconnect. * @param serverURI The server URI that the connection was made to. */ @Override public void connectComplete(boolean reconnect, String serverURI) { try{ //如果监测到有,号,说明要订阅多个主题 if(mqttTopic.contains(",")){ //多主题 String[] mqttTopics = mqttTopic.split(","); mqttClient.subscribe(mqttTopics); }else{ //单主题 mqttClient.subscribe(mqttTopic); } log.info("----TAG", "connectComplete: 订阅主题成功"); }catch(Exception e){ e.printStackTrace(); log.info("----TAG", "error: 订阅主题失败"); } } @Override public void connectionLost(Throwable throwable) { log.error("连接断开,下面做重连..."); long reconnectTimes = 1; while (true) { try { if (mqttClient.isConnected()) { log.warn("mqtt reconnect success end"); break; } if(reconnectTimes == 10){ //当重连次数达到10次时,就抛出异常,不在重连 log.warn("mqtt reconnect error"); return; } log.warn("mqtt reconnect times = {} try again...", reconnectTimes++); mqttClient.reconnect(); } catch (MqttException e) { log.error("", e); } try { Thread.sleep(1000); } catch (InterruptedException e1) { // e1.printStackTrace(); } } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); //向我们通道中发送消息 RedisMsg redisMsg = new RedisMsg(); redisMsg.setChannel(channel); redisMsg.setMsg("推送MQTT消息"); SensorMqttMsg mqttMsg = new SensorMqttMsg(); mqttMsg.setProductId(productId); mqttMsg.setPayload(new String(mqttMessage.getPayload())); redisMsg.setData(mqttMsg); RedisTopicUtil.sendMessage(channel, redisMsg); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //认证过程 log.info("deliveryComplete............."); } }); //放入缓存,根据clinetId吧mqttClient对象放进去 MqttClientManager.MQTT_CLIENT_MAP.putIfAbsent(clientId, mqttClient); } catch (Exception e) { e.printStackTrace(); //当创建客户端的时候出现 已断开连接,有可能是在另一个环境下启动了该客户端,直接吧这边的客户端关闭,不然另一边会无限重连 if(e.getMessage().equals("已断开连接") || e.getMessage().equals("客户机未连接")){ try { mqttClient.close(); } catch (MqttException ex) { ex.printStackTrace(); } } } } }
这里面吧MqttClient放入了ConcurrentHashMap中,然后后面需要关闭的时候重ConcurrentHashMap拿出MqttClient对象使用如下代码关闭就可以了。
//断开连接 client.disconnect(); //关闭连接 client.close();
其次还解决了 MQTT断线重连及订阅消息恢复 问题。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233349.html原文链接:https://javaforall.net