MQTTnet[通俗易懂]

MQTTnet[通俗易懂]近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置源码>>>GitHub//Broker设置options.MaxPendingMessagesPerClient=1000;options.EnablePersistentSessions…

大家好,又见面了,我是你们的朋友全栈君。

近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。
环境:.NET Framework 4.6.1 MQTTnet 2.8.2.0 遵循MQTT 3.1.0协议规范 源码 >>> GitHub
注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置

// Broker设置
options.MaxPendingMessagesPerClient = 1000;
options.EnablePersistentSessions = true;

// 客户端,ClientId不能变
_clientOption.CleanSession = false;

以下是两个封装的类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace MQTT.Util
{
    public class MqttServer
    {
        private IMqttServer _mqttServer = null;

        private string _ip = string.Empty;
        public string IP { get { return this._ip; } }

        private string _port = string.Empty;
        public string Port { get { return this._port; } }

        public Action<MqttConnectionValidatorContext> ConnectionValidator = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;
        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic = null;
        public EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic = null;
        public EventHandler Started = null;
        public EventHandler Stopped = null;

        public MqttServer(string ip, string port)
        {
            this._ip = ip;
            this._port = port;
        }

        public async void Start()
        {
            // Backlog:表示服务器可以接受的并发请求的最大值
            var optionBuilder = new MqttServerOptionsBuilder().WithConnectionBacklog(1000).WithDefaultEndpointPort(Convert.ToInt32(this._port));
            optionBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(this._ip));

            var options = optionBuilder.Build() as MqttServerOptions;
            options.ConnectionValidator += this.ConnectionValidator;
            options.MaxPendingMessagesPerClient = 1000;
            options.EnablePersistentSessions = true;

            this._mqttServer = new MqttFactory().CreateMqttServer();
            this._mqttServer.ClientConnected += this.ClientConnected;
            this._mqttServer.ClientDisconnected += this.ClientDisconnected;
            this._mqttServer.ApplicationMessageReceived += this.ApplicationMessageReceived;
            this._mqttServer.ClientSubscribedTopic += this.ClientSubscribedTopic;
            this._mqttServer.ClientUnsubscribedTopic += this.ClientUnsubscribedTopic;
            this._mqttServer.Started += this.Started;
            this._mqttServer.Stopped += this.Stopped;

            await _mqttServer.StartAsync(options);
        }

        public void Stop()
        {
            if (this._mqttServer == null)
                return;
            foreach (var clientSessionStatuse in this._mqttServer.GetClientSessionsStatusAsync().Result)
            {
                clientSessionStatuse.DisconnectAsync();
            }
            this._mqttServer.StopAsync();
            this._mqttServer = null;
        }
    }
}

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace MQTT.Util
{
    public class MqttClient
    {
        private IMqttClient _mqttClient = null;
        private MqttClientOptions _clientOption = null;

        private string _serverIP = string.Empty;
        public string ServerIP { get { return this._serverIP; } }

        private string _serverPort = string.Empty;
        public string ServerPort { get { return this._serverPort; } }

        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;

        public MqttClient(string clientId = "")
        {
            if (clientId == string.Empty)
                clientId = Guid.NewGuid().ToString();

            this._clientOption = new MqttClientOptions() { ClientId = clientId };
            this._clientOption.CleanSession = false;
            this._clientOption.KeepAlivePeriod = TimeSpan.FromSeconds(90);
            this._clientOption.CommunicationTimeout = TimeSpan.FromSeconds(10);
            // 遗嘱信息
            // this._clientOption.WillMessage = 
            this._clientOption.ProtocolVersion = MQTTnet.Serializer.MqttProtocolVersion.V310;

            this._mqttClient = new MqttFactory().CreateMqttClient();
        }

        public void Init(string ip, string port, string userName, string pwd)
        {
            this._serverIP = ip;
            this._serverPort = port;

            _mqttClient.ApplicationMessageReceived += this.ApplicationMessageReceived;
            _mqttClient.Connected += this.ClientConnected;
            _mqttClient.Disconnected += this.ClientDisconnected;
            this._clientOption.ChannelOptions = new MqttClientTcpOptions()
            {
                Server = this._serverIP,
                Port = Convert.ToInt32(this._serverPort)
            };
            this._clientOption.Credentials = new MqttClientCredentials()
            {
                Username = userName,
                Password = pwd
            };
        }

        /**
      host: 服务器地址
      port: 服务器端口
      tls:  是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
      keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s
      clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
      auth: 是否使用登录验证
      user: 用户名
      pass: 密码
      willTopic: 订阅主题
      willMsg: 自定义的离线消息
      willQos: 接收离线消息的级别
      clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID
*/
        public async void Connect()
        {
            if (this._mqttClient == null)
                return;

            try
            {
                if (this._mqttClient.IsConnected)
                {
                    return;
                }

                await _mqttClient.ConnectAsync(this._clientOption);
            }
            catch (Exception)
            {
                throw;
            }
        }

        public async void Disconnect()
        {
            if (this._mqttClient == null)
                return;
            if (this._mqttClient.IsConnected)
                await _mqttClient.DisconnectAsync();
        }

        public void Stop()
        {
            if (this._mqttClient == null)
                return;
            this._mqttClient.Dispose();
            this._mqttClient = null;
        }

        public void SubscribeAsync(string topic, string qos)
        {
            Task.Factory.StartNew(async () =>
            {
                await _mqttClient.SubscribeAsync(
                    new List<TopicFilter>
                    {
                            new TopicFilter(
                                topic,
                                (MqttQualityOfServiceLevel)Enum.Parse(typeof (MqttQualityOfServiceLevel), qos))
                    });
            });
        }

        public void PublishAsync(string topic, string qos, string payload)
        {
            if (this._mqttClient == null)
                return;
            Task.Factory.StartNew(async () =>
            {
                var msg = new MqttApplicationMessage()
                {
                    Topic = topic,
                    Payload = Encoding.UTF8.GetBytes(payload),
                    QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), qos),
                    Retain = true // 是否保留信息
                };
                await _mqttClient.PublishAsync(msg);
            });
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152846.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Navicat 15 for MySQL注册激活码破解方法

    Navicat 15 for MySQL注册激活码破解方法,https://javaforall.net/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

    2022年3月14日
    99
  • 深入理解java虚拟机「建议收藏」

    深入理解java虚拟机「建议收藏」作者:战斗民族就是干转载请注明出处:https://www.cnblogs.com/prayers/p/5515245.html一、运行时数据区域线程隔离:线程隔离的意思,就是给不同的线程多分

    2022年8月6日
    6
  • myeclipse软件下载_eclipse手机版

    myeclipse软件下载_eclipse手机版myeclipse官方下载:官方站点下载:downme(MyEclipseEnterpriseWorkbench8.0forEclipse3.5.1andWindows98/2000/NT/XP/Vista/7(11/23/2009))说明:已包含Elipse和JRE,无需安装其他即可运行的ALL-in-ONE版本)Version:8.0GA|Fil

    2022年9月26日
    0
  • Pytest(13)命令行参数–tb的使用

    Pytest(13)命令行参数–tb的使用前言pytest使用命令行执行用例的时候,有些用例执行失败的时候,屏幕上会出现一大堆的报错内容,不方便快速查看是哪些用例失败。–tb=style参数可以设置报错的时候回溯打印内容,可以设置参

    2022年7月31日
    4
  • java全局变量的定义和声明_定义全局变量的方法

    java全局变量的定义和声明_定义全局变量的方法JAVA全局变量(或称成员变量)可分两种,一种是静态变量,另一种是实例变量,即在类体中定义的变量,有三点得注意:一、成员变量不能在类体中先声明(定义)后赋值,但静态变量可以先在类体中声明,然后在方法中赋值(当然实例变量是不行的);1)如以下程序会出问题:publicclassTest{staticinta;//在类体中声明整型静态变量a。intb;

    2022年8月21日
    9
  • Jenkins自动构建部署项目到远程服务器上

    Jenkins自动构建部署项目到远程服务器上1.下载jenkins下载地址:https://jenkins.io/启动jenkins方式有2种1.1切换到jenkins.war包的存放目录启动命令:java-jarjenkins.war启动jenkins访问:localhost:8080就能…

    2022年6月2日
    61

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号