MQTTnet[通俗易懂]

MQTTnet[通俗易懂]近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置源码>>>GitHub//Broker设置options.MaxPendingMessagesPerClient=1000;options.EnablePersistentSessions…

大家好,又见面了,我是你们的朋友全栈君。

近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。
环境:.NET Framework 4.6.1 MQTTnet 2.8.2.0 遵循MQTT 3.1.0协议规范 源码 >>> GitHub
注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置

// Broker设置
options.MaxPendingMessagesPerClient = 1000;
options.EnablePersistentSessions = true;

// 客户端,ClientId不能变
_clientOption.CleanSession = false;

以下是两个封装的类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace MQTT.Util
{
    public class MqttServer
    {
        private IMqttServer _mqttServer = null;

        private string _ip = string.Empty;
        public string IP { get { return this._ip; } }

        private string _port = string.Empty;
        public string Port { get { return this._port; } }

        public Action<MqttConnectionValidatorContext> ConnectionValidator = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;
        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic = null;
        public EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic = null;
        public EventHandler Started = null;
        public EventHandler Stopped = null;

        public MqttServer(string ip, string port)
        {
            this._ip = ip;
            this._port = port;
        }

        public async void Start()
        {
            // Backlog:表示服务器可以接受的并发请求的最大值
            var optionBuilder = new MqttServerOptionsBuilder().WithConnectionBacklog(1000).WithDefaultEndpointPort(Convert.ToInt32(this._port));
            optionBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(this._ip));

            var options = optionBuilder.Build() as MqttServerOptions;
            options.ConnectionValidator += this.ConnectionValidator;
            options.MaxPendingMessagesPerClient = 1000;
            options.EnablePersistentSessions = true;

            this._mqttServer = new MqttFactory().CreateMqttServer();
            this._mqttServer.ClientConnected += this.ClientConnected;
            this._mqttServer.ClientDisconnected += this.ClientDisconnected;
            this._mqttServer.ApplicationMessageReceived += this.ApplicationMessageReceived;
            this._mqttServer.ClientSubscribedTopic += this.ClientSubscribedTopic;
            this._mqttServer.ClientUnsubscribedTopic += this.ClientUnsubscribedTopic;
            this._mqttServer.Started += this.Started;
            this._mqttServer.Stopped += this.Stopped;

            await _mqttServer.StartAsync(options);
        }

        public void Stop()
        {
            if (this._mqttServer == null)
                return;
            foreach (var clientSessionStatuse in this._mqttServer.GetClientSessionsStatusAsync().Result)
            {
                clientSessionStatuse.DisconnectAsync();
            }
            this._mqttServer.StopAsync();
            this._mqttServer = null;
        }
    }
}

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace MQTT.Util
{
    public class MqttClient
    {
        private IMqttClient _mqttClient = null;
        private MqttClientOptions _clientOption = null;

        private string _serverIP = string.Empty;
        public string ServerIP { get { return this._serverIP; } }

        private string _serverPort = string.Empty;
        public string ServerPort { get { return this._serverPort; } }

        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;

        public MqttClient(string clientId = "")
        {
            if (clientId == string.Empty)
                clientId = Guid.NewGuid().ToString();

            this._clientOption = new MqttClientOptions() { ClientId = clientId };
            this._clientOption.CleanSession = false;
            this._clientOption.KeepAlivePeriod = TimeSpan.FromSeconds(90);
            this._clientOption.CommunicationTimeout = TimeSpan.FromSeconds(10);
            // 遗嘱信息
            // this._clientOption.WillMessage = 
            this._clientOption.ProtocolVersion = MQTTnet.Serializer.MqttProtocolVersion.V310;

            this._mqttClient = new MqttFactory().CreateMqttClient();
        }

        public void Init(string ip, string port, string userName, string pwd)
        {
            this._serverIP = ip;
            this._serverPort = port;

            _mqttClient.ApplicationMessageReceived += this.ApplicationMessageReceived;
            _mqttClient.Connected += this.ClientConnected;
            _mqttClient.Disconnected += this.ClientDisconnected;
            this._clientOption.ChannelOptions = new MqttClientTcpOptions()
            {
                Server = this._serverIP,
                Port = Convert.ToInt32(this._serverPort)
            };
            this._clientOption.Credentials = new MqttClientCredentials()
            {
                Username = userName,
                Password = pwd
            };
        }

        /**
      host: 服务器地址
      port: 服务器端口
      tls:  是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
      keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s
      clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
      auth: 是否使用登录验证
      user: 用户名
      pass: 密码
      willTopic: 订阅主题
      willMsg: 自定义的离线消息
      willQos: 接收离线消息的级别
      clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID
*/
        public async void Connect()
        {
            if (this._mqttClient == null)
                return;

            try
            {
                if (this._mqttClient.IsConnected)
                {
                    return;
                }

                await _mqttClient.ConnectAsync(this._clientOption);
            }
            catch (Exception)
            {
                throw;
            }
        }

        public async void Disconnect()
        {
            if (this._mqttClient == null)
                return;
            if (this._mqttClient.IsConnected)
                await _mqttClient.DisconnectAsync();
        }

        public void Stop()
        {
            if (this._mqttClient == null)
                return;
            this._mqttClient.Dispose();
            this._mqttClient = null;
        }

        public void SubscribeAsync(string topic, string qos)
        {
            Task.Factory.StartNew(async () =>
            {
                await _mqttClient.SubscribeAsync(
                    new List<TopicFilter>
                    {
                            new TopicFilter(
                                topic,
                                (MqttQualityOfServiceLevel)Enum.Parse(typeof (MqttQualityOfServiceLevel), qos))
                    });
            });
        }

        public void PublishAsync(string topic, string qos, string payload)
        {
            if (this._mqttClient == null)
                return;
            Task.Factory.StartNew(async () =>
            {
                var msg = new MqttApplicationMessage()
                {
                    Topic = topic,
                    Payload = Encoding.UTF8.GetBytes(payload),
                    QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), qos),
                    Retain = true // 是否保留信息
                };
                await _mqttClient.PublishAsync(msg);
            });
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152846.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • js 双向绑定数据

    js 双向绑定数据

    2021年6月13日
    114
  • linux编辑文件命令vim怎么退出_vim退出不保存的命令

    linux编辑文件命令vim怎么退出_vim退出不保存的命令Linux(Ubuntu)vim编辑器保存退出命令进入vim编辑器,输入i进入编辑状态按esc退出编辑常用的保存退出命令如下:1、:w(保存编辑的文件内容,但不退出vim编辑器)2、:w!(强制写文件,即覆盖原有的文件,如果原有文件的访问权限不允许写入文件,例如,原有的文件为只读文件,则可使用这个命令强制写入)3、:q(未做任何编辑,无需保存文件,直接退出vim编辑器)4、:q!(强制退出vim编辑器,适用于已写入内容但不需要保存放弃该内容时)5、:wq(保存编辑的文件并退出vim编辑器

    2022年8月24日
    17
  • Win7如何解决telnet不是内部或外部命令的方案!听语音

    Win7如何解决telnet不是内部或外部命令的方案!听语音

    2021年9月23日
    48
  • pythoncharm怎么保存_pycharm怎么设置代码自动保存「建议收藏」

    pythoncharm怎么保存_pycharm怎么设置代码自动保存「建议收藏」pycharm一般安装完毕,就是默认是自动保存的,但是……但是….既然是程序,既然是软件,就难免出现bug。也许会有码友出现头天晚上写好的代码,打开一看,第二天白花花一片!!!泪奔有没有最简单的,就是每次编写完毕,习惯按ctrl+s手动保存。但是,提醒你务必检查一下你的设置里面,是不是码友弄好自动保存!步骤如下:菜单File->Settings…->Ap…

    2022年8月26日
    8
  • Linux系统终端里的root@localhost变成root@bogon的主机名改变的解决办法

    Linux系统终端里的root@localhost变成root@bogon的主机名改变的解决办法bogon 是主机名利用 hostname 可以查看当前主机名 vi etc sysconfig network 中修改 HOSTNAME 必须重新启动才能生效 Linux 主机名被修改成 bogon 问题的几种解决办法当 Linux 主机名由 myhostname 变成了 bogon 了之后 访问网络就会出现问题 重启后也没有解决 网上搜索 N 久之后 发现了如下几种解决方式 特此记录一下 1 在 linux 下添

    2025年6月8日
    4
  • LaTeX 插入图片 公式

    LaTeX 插入图片 公式一、LaTeX插入图片首先需要添加一个宏包graphicx,在插入图片的位置可以直接点击LaTeX的插入图片快捷按钮,然后修改其中的*位置的内容既可(caption与label若不需要也可以删掉)。\documentclass{article}\usepackage{graphicx}\begin{document}\begin{figure}\centering…

    2022年5月1日
    51

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号