公司中项目大多是物联网项目,需要跟设备进行交互,用到的协议比较多,如NB/MQTT/LWM2M/COAP等,项目中不可避免用到了MQTT协议,本文介绍springboot项目MQTT客户端实现,不多说直接上可执行代码。
一、EMQ官网java sdk demo,如果只需要用到一个客户端,可以参照下官网demo,修改下应用于项目
1、pom.xml依赖引用
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.2
2、App.java
package io.emqx; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class App { public static void main(String[] args) { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String content = "Hello World"; int qos = 2; String broker = "tcp://broker.emqx.io:1883"; String clientId = "emqx_test"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("emqx_test"); connOpts.setPassword("emqx_test_password".toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 设置回调 client.setCallback(new PushCallback()); // 建立连接 System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: " + content); // 订阅 client.subscribe(subTopic); // 消息发布所需参数 MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pubTopic, message); System.out.println("Message published"); client.disconnect(); System.out.println("Disconnected"); client.close(); System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } }
3、回调消息处理类 OnMessageCallback.java
package io.emqx; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class OnMessageCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连"); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题:" + topic); System.out.println("接收消息Qos:" + message.getQos()); System.out.println("接收消息内容:" + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
二、正规项目中,肯定不能如demo那样简单实现,也许需要创建多个客户端,项目中本人做了封装,如下:
1、application.yml配置
customer: mqtt: broker: tcp://127.0.0.1:1883 clientList: #客户端ID - clientId: iotGateway #监听主题 subscribeTopic: v1/devices/me/rpc/response/+ #用户名 username: #密码 password:
2、Mqtt客户端数据类
package com.wfl.iot.mqtt.model; import lombok.Data; / * MQTT客户端 * * @author wangfenglei */ @Data public class MqttClientVO { / * 客户端ID */ private String clientId; / * 监听主题 */ private String subscribeTopic; / * 用户名 */ private String userName; / * 密码 */ private String password; }
package com.wfl.iot.mqtt.config; import com.wfl.iot.mqtt.model.MqttClientVO; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.List; / * Mqtt配置类 * * @author wangfenglei */ @Data @Configuration @ConfigurationProperties(prefix = "customer.mqtt") public class MqttConfig { / * mqtt broker地址 */ String broker; / * 需要创建的MQTT客户端 */ List
clientList; }
3、MQTT回调(策略模式)
package com.wfl.iot.mqtt.callback; import com.wfl.iot.mqtt.MqttClientManager; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; / * MQTT回调抽象类 * * @author wangfenglei */ @Slf4j public abstract class AbsMqttCallBack implements MqttCallback { private String clientId; private MqttConnectOptions connectOptions; public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public MqttConnectOptions getConnectOptions() { return connectOptions; } public void setConnectOptions(MqttConnectOptions connectOptions) { this.connectOptions = connectOptions; } / * 失去连接操作,进行重连 * * @param throwable 异常 */ @Override public void connectionLost(Throwable throwable) { try { if (null != clientId) { if (null != connectOptions) { MqttClientManager.getMqttClientById(clientId).connect(connectOptions); } else { MqttClientManager.getMqttClientById(clientId).connect(); } } } catch (Exception e) { log.error("{} reconnect failed!", e); } } / * 接收订阅消息 * * @param s 主题 * @param mqttMessage 接收消息 * @throws Exception 异常 */ @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { String content = new String(mqttMessage.getPayload()); log.info("Receive topic[{}],message={}", s, content); handleReceiveMessage(s, content); } / * 消息发送成功 * * @param iMqttDeliveryToken toke */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } / * 处理接收的消息 * * @param topic 主题 * @param message 消息内容 */ protected abstract void handleReceiveMessage(String topic, String message); }
package com.wfl.iot.mqtt.callback; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; / * 默认回调 * * @author wangfenglei */ @Slf4j @Component("default") public class DefaultMqttCallBack extends AbsMqttCallBack { @Override protected void handleReceiveMessage(String topic, String message) { log.info("DefaultCallBack:topic={},message={}", topic, message); } }
说明:后续需求变更需要新增MQTT回调类,只需实现AbsMqttCallBack即可,但是@Compoent(“客户端ID”)注解的名称需要跟 application.yml里的customer.mqtt.clientList.clientId一致,如此才能自动匹配注册回调类
package com.wfl.iot.mqtt.callback; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; / * MQTT订阅回调环境类 * * @author wangfenglei */ @Component public class MqttCallBackContext { private final Map
callBackMap = new ConcurrentHashMap<>(); / * 默认构造函数 * * @param callBackMap 回调集合 */ public MqttCallBackContext(Map
callBackMap) { this.callBackMap.clear(); callBackMap.forEach((k, v) -> this.callBackMap.put(k, v)); } / * 获取MQTT回调类 * * @param clientId 客户端ID * @return MQTT回调类 */ public AbsMqttCallBack getCallBack(String clientId) { return this.callBackMap.get(clientId); } }
3、MQTT客户端管理
package com.wfl.iot.mqtt; import com.wfl.iot.mqtt.callback.AbsMqttCallBack; import com.wfl.iot.mqtt.callback.MqttCallBackContext; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; / * MQTT客户端管理类,如果客户端非常多后续可入redis缓存 * * @author wangfenglei */ @Slf4j @Component public class MqttClientManager { @Value("${customer.mqtt.broker}") private String mqttBroker; @Resource private MqttCallBackContext mqttCallBackContext; / * 存储MQTT客户端 */ public static Map
MQTT_CLIENT_MAP = new ConcurrentHashMap<>(); public static MqttClient getMqttClientById(String clientId) { return MQTT_CLIENT_MAP.get(clientId); } / * 创建mqtt客户端 * * @param clientId 客户端ID * @param subscribeTopic 订阅主题,可为空 * @param userName 用户名,可为空 * @param password 密码,可为空 * @return mqtt客户端 */ public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) { MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(mqttBroker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); if (null != userName && !"".equals(userName)) { connOpts.setUserName(userName); } if (null != password && !"".equals(password)) { connOpts.setPassword(password.toCharArray()); } connOpts.setCleanSession(true); if (null != subscribeTopic && !"".equals(subscribeTopic)) { AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId); if (null == callBack) { callBack = mqttCallBackContext.getCallBack("default"); } callBack.setClientId(clientId); callBack.setConnectOptions(connOpts); client.setCallback(callBack); } //连接mqtt服务端broker client.connect(connOpts); if (null != subscribeTopic && !"".equals(subscribeTopic)) { client.subscribe(subscribeTopic); } MQTT_CLIENT_MAP.putIfAbsent(clientId, client); } catch (MqttException e) { log.error("Create mqttClient failed!", e); } } }
4、MQTT客户端项目启动自动创建
package com.wfl.iot.mqtt; import com.wfl.iot.mqtt.config.MqttConfig; import com.wfl.iot.mqtt.model.MqttClientVO; import org.omg.CORBA.PRIVATE_MEMBER; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.List; / * MQTT客户端创建 * * @author wangfenglei */ @Component public class MqttClientCreate { @Resource private MqttClientManager MqttClientManager; @Autowired private MqttConfig mqttConfig; / * 创建MQTT客户端 */ @PostConstruct public void createMqttClient() { List
mqttClientList = mqttConfig.getClientList(); for (MqttClientVO mqttClient : mqttClientList) { //创建客户端,客户端ID:demo,回调类跟客户端ID一致 MqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword()); } } }
说明:@ PostConstruct注解,可以在项目启动过程中自动创建mqtt客户端,如果启动比较耗时,可以考虑creatMqttClient方法里面使用线程
5、MQTT客户端使用工具类
package com.wfl.iot.mqtt.util; import com.wfl.iot.mqtt.MqttClientManager; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; / * MQTT客户端工具类 * * @author wangfenglei */ @Slf4j public class MqttClientUtil { public static void sendMqttMsg(String clientId, String topic, String content) { try { MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(2); MqttClient mqttClient = MqttClientManager.getMqttClientById(clientId); if (null == mqttClient) { log.error("Not exist mqttClient where it's clientId is {}", clientId); return; } MqttClientManager.getMqttClientById(clientId).publish(topic, message); log.info("Publish to mqtt broker,message={}", message); } catch (MqttException e) { log.error("MqttClient send msg faild!", e); } } }
说明:项目中向mqtt broker发送消息,直接调用MqttClientUtil.sendMqttMsg方法即可
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/210807.html原文链接:https://javaforall.net
