java连接MQTT服务器(Springboot整合MQTT)

java连接MQTT服务器(Springboot整合MQTT)目录一、业务场景二、本文只讲解java连接MQTT服务器进行数据处理一、业务场景硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。MQTT是基于发布(Publish)/订阅(Subscribe)模式来进行通信及数据交换的。二、本文只讲解java连接MQTT服务器进行数据处理…

大家好,又见面了,我是你们的朋友全栈君。

目录

一、业务场景

二、本文只讲解java连接MQTT服务器进行数据处理


一、业务场景

 硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。

MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的。

二、本文只讲解java连接MQTT服务器进行数据处理

1、新建springboot项目,pom文件中直接引入下面的mqtt依赖

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、 编写MQTT工具类

package com.siborui.dc.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MQTT工具类操作
 *
 * @author Mr.Qu
 * @since v1.1.0 2020-01-10
 */
@Slf4j
@Component
public class MQTTConnect {

    private String HOST = "tcp://127.0.0.1:1883"; //mqtt服务器的地址和端口号
    private final String clientId = "DC" + (int) (Math.random() * 100000000);
    private MqttClient mqttClient;

    /**
     * 客户端connect连接mqtt服务器
     *
     * @param userName     用户名
     * @param passWord     密码
     * @param mqttCallback 回调函数
     **/
    public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
        MqttConnectOptions options = mqttConnectOptions(userName, passWord);
        if (mqttCallback == null) {
            mqttClient.setCallback(new Callback());
        } else {
            mqttClient.setCallback(mqttCallback);
        }
        mqttClient.connect(options);
    }

    /**
     * MQTT连接参数设置
     */
    private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
        mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(10);///默认:30
        options.setAutomaticReconnect(true);//默认:false
        options.setCleanSession(false);//默认:true
        //options.setKeepAliveInterval(20);//默认:60
        return options;
    }

    /**
     * 关闭MQTT连接
     */
    public void close() throws MqttException {
        mqttClient.close();
        mqttClient.disconnect();
    }

    /**
     * 向某个主题发布消息 默认qos:1
     *
     * @param topic:发布的主题
     * @param msg:发布的消息
     */
    public void pub(String topic, String msg) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        //mqttMessage.setQos(2);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 向某个主题发布消息
     *
     * @param topic: 发布的主题
     * @param msg:   发布的消息
     * @param qos:   消息质量    Qos:0、1、2
     */
    public void pub(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 订阅某一个主题 ,此方法默认的的Qos等级为:1
     *
     * @param topic 主题
     */
    public void sub(String topic) throws MqttException {
        mqttClient.subscribe(topic);
    }

    /**
     * 订阅某一个主题,可携带Qos
     *
     * @param topic 所要订阅的主题
     * @param qos   消息质量:0、1、2
     */
    public void sub(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }

    /**
     * main函数自己测试用
     */
    public static void main(String[] args) throws MqttException {
        MQTTConnect mqttConnect = new MQTTConnect();
        mqttConnect.setMqttClient("admin", "public", new Callback());
        mqttConnect.sub("com/iot/init");
        mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
    }
}

3、编写MQTT的回调函数

package com.siborui.dc.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 常规MQTT回调函数
 *
 * @author Mr.Qu
 * @since 2020/1/9 16:26
 */
@Slf4j
public class Callback implements MqttCallback {

    /**
     * MQTT 断开连接会执行此方法
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("断开了MQTT连接 :{}", throwable.getMessage());
        log.error(throwable.getMessage(), throwable);
    }

    /**
     * publish发布成功后会执行到这里
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发布消息成功");
    }

    /**
     * subscribe订阅后得到的消息会执行到这里
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        //  TODO    此处可以将订阅得到的消息进行业务处理、数据存储
        log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
    }
}

4、由于业务场景需要,在项目启动时,监听MQTT主题Topic,编写MQTT监听器

package com.siborui.dc.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import com.siborui.dc.mqtt.Callback;

/**
 * 项目启动 监听主题
 *
 * @author Mr.Qu
 * @since 2020/1/10
 */
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {

    private final MQTTConnect server;

    @Autowired
    public MQTTListener(MQTTConnect server) {
        this.server = server;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            server.setMqttClient("admin", "public", new Callback());
            server.sub("com/iot/init");
        } catch (MqttException e) {
            log.error(e.getMessage(), e);
        }
    }
}


5、源码传送门

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/140440.html原文链接:https://javaforall.net

(0)
上一篇 2022年5月28日 上午6:20
下一篇 2022年5月28日 上午6:20


相关推荐

  • JS中automation服务器不能创建对象

    JS中automation服务器不能创建对象今天看到 CSDN 上斌哥的 Blog 于是写了段 JS 脚本 用 QTP 执行 JS 代码的 但是提示 automation 服务器不能创建对象 问题 上网搜索 并且按照方法解决问题了 于是拷贝解决方法记录下来 1 如果是 Scripting FileSystemOb FSO 文本文件读写 被关闭了 开启 FSO 功能即可 在 运行 中执行 regsvr32scrr dll 即可 2 安全模式设置成 中

    2025年11月10日
    4
  • css 两端对齐

    css 两端对齐css 两端对齐当我们做筛选框的时候 有的时候会需要筛选文字两端对齐的情况 如图文本的对齐方式大都是通过设置 text align 来实现 text align 属性下有一个 justify 值可以设置元素的两端对齐 但是 text align justify 属性有一些不足之处 在单行文本下 无法实现两端对齐效果 在多行文本下 无法实现最后一行文本的两端对齐效果 解决方法的思路 由于在单行文本下和多行文本下最后一样无法实现两端对齐效果 因此给元素新增一行 即可实现 justify 的两个不足之处 代码如下 amp

    2026年3月18日
    2
  • 2020互联网行业术语

    2020互联网行业术语互联网行业一般指的是互联网企业,是指以计算机网络技术为基础,利用网络平台提供服务并因此获得收入的企业。其业务范围通常覆盖全国甚至全球,注册用户通常达到千万级以上,用户活跃度也非常高,经常在同一时间点出现请求并行的情况。代表企业有阿里巴巴、腾讯、百度、东、宇节跳动,美团饿了么等。传统行业通常是指互联网向传统行业渗透,实质是传统行业需要互联网。互联网是技术平台、底层架构,它源于安全需求,却在经济领域大放异彩。传统经济正向网络经济转轨,这-有机联系的经济使各行各业均需连接,因此以互联网为代表的联接型

    2022年6月15日
    54
  • 探讨PMI测量配置对5G下行速率的影响

    探讨PMI测量配置对5G下行速率的影响问题描述 当前 5GPMI 测量配置均为 8P4B 那么什么是 8P4B 另外为什么功能开启后能提升下行速率 问题分析 首先简单回答 8P4B 是一组下行参考信号 CSI RS UE 测量 CSI RS 信号用于给基站反馈下行 CQI PMI RI CRI 等信息 当基站读到信息后会给 UE 分配最合适的 MCS TBSize RB 资源等调度相关的参数 从而使 UE 的下载性能达到最佳 一 8PP 是 port 的缩写 就是 8 个端口 这里的端口是逻辑天线端口 下图是按照目前中兴推荐配置后的一个 RB 内的 8 个端口

    2025年9月4日
    6
  • python缩进错误「建议收藏」

    Python报错TabError:inconsistentuseoftabsandspacesinindentation平台Geany错误原因python的缩进符为四位空格,修改方法:Python报错:IndentationError:unindentdoesnotmatchanyouterindentationlevel依旧是缩进出现问题如图:缩进时四个空格一个不能少!!!…

    2022年4月7日
    65
  • servlet和jsp中的Request转发,重定向有何区别_jsp重定向和转发的区别

    servlet和jsp中的Request转发,重定向有何区别_jsp重定向和转发的区别转发request.getRequestDispatcher("/index.jsp").forward(request,response);System.out.println("被执行了");index.jsp&lt;html&gt;&lt;body&gt;&lt;h2&gt;HelloWorld!&lt;/h2&gt;&lt;/body

    2025年10月2日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

评论列表(1条)

  • silencezheng的头像
    silencezheng 2022年8月25日 上午9:55

    请问硬件设备如何与mqtt建立连接呢?

关注全栈程序员社区公众号