MQTTnet[通俗易懂]

MQTTnet[通俗易懂]近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置源码>>>GitHub//Broker设置options.MaxPendingMessagesPerClient=1000;options.EnablePersistentSessions…

大家好,又见面了,我是你们的朋友全栈君。

近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。
环境:.NET Framework 4.6.1 MQTTnet 2.8.2.0 遵循MQTT 3.1.0协议规范 源码 >>> GitHub
注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置

// Broker设置
options.MaxPendingMessagesPerClient = 1000;
options.EnablePersistentSessions = true;

// 客户端,ClientId不能变
_clientOption.CleanSession = false;

以下是两个封装的类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace MQTT.Util
{
    public class MqttServer
    {
        private IMqttServer _mqttServer = null;

        private string _ip = string.Empty;
        public string IP { get { return this._ip; } }

        private string _port = string.Empty;
        public string Port { get { return this._port; } }

        public Action<MqttConnectionValidatorContext> ConnectionValidator = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;
        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic = null;
        public EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic = null;
        public EventHandler Started = null;
        public EventHandler Stopped = null;

        public MqttServer(string ip, string port)
        {
            this._ip = ip;
            this._port = port;
        }

        public async void Start()
        {
            // Backlog:表示服务器可以接受的并发请求的最大值
            var optionBuilder = new MqttServerOptionsBuilder().WithConnectionBacklog(1000).WithDefaultEndpointPort(Convert.ToInt32(this._port));
            optionBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(this._ip));

            var options = optionBuilder.Build() as MqttServerOptions;
            options.ConnectionValidator += this.ConnectionValidator;
            options.MaxPendingMessagesPerClient = 1000;
            options.EnablePersistentSessions = true;

            this._mqttServer = new MqttFactory().CreateMqttServer();
            this._mqttServer.ClientConnected += this.ClientConnected;
            this._mqttServer.ClientDisconnected += this.ClientDisconnected;
            this._mqttServer.ApplicationMessageReceived += this.ApplicationMessageReceived;
            this._mqttServer.ClientSubscribedTopic += this.ClientSubscribedTopic;
            this._mqttServer.ClientUnsubscribedTopic += this.ClientUnsubscribedTopic;
            this._mqttServer.Started += this.Started;
            this._mqttServer.Stopped += this.Stopped;

            await _mqttServer.StartAsync(options);
        }

        public void Stop()
        {
            if (this._mqttServer == null)
                return;
            foreach (var clientSessionStatuse in this._mqttServer.GetClientSessionsStatusAsync().Result)
            {
                clientSessionStatuse.DisconnectAsync();
            }
            this._mqttServer.StopAsync();
            this._mqttServer = null;
        }
    }
}

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace MQTT.Util
{
    public class MqttClient
    {
        private IMqttClient _mqttClient = null;
        private MqttClientOptions _clientOption = null;

        private string _serverIP = string.Empty;
        public string ServerIP { get { return this._serverIP; } }

        private string _serverPort = string.Empty;
        public string ServerPort { get { return this._serverPort; } }

        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;

        public MqttClient(string clientId = "")
        {
            if (clientId == string.Empty)
                clientId = Guid.NewGuid().ToString();

            this._clientOption = new MqttClientOptions() { ClientId = clientId };
            this._clientOption.CleanSession = false;
            this._clientOption.KeepAlivePeriod = TimeSpan.FromSeconds(90);
            this._clientOption.CommunicationTimeout = TimeSpan.FromSeconds(10);
            // 遗嘱信息
            // this._clientOption.WillMessage = 
            this._clientOption.ProtocolVersion = MQTTnet.Serializer.MqttProtocolVersion.V310;

            this._mqttClient = new MqttFactory().CreateMqttClient();
        }

        public void Init(string ip, string port, string userName, string pwd)
        {
            this._serverIP = ip;
            this._serverPort = port;

            _mqttClient.ApplicationMessageReceived += this.ApplicationMessageReceived;
            _mqttClient.Connected += this.ClientConnected;
            _mqttClient.Disconnected += this.ClientDisconnected;
            this._clientOption.ChannelOptions = new MqttClientTcpOptions()
            {
                Server = this._serverIP,
                Port = Convert.ToInt32(this._serverPort)
            };
            this._clientOption.Credentials = new MqttClientCredentials()
            {
                Username = userName,
                Password = pwd
            };
        }

        /**
      host: 服务器地址
      port: 服务器端口
      tls:  是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
      keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s
      clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
      auth: 是否使用登录验证
      user: 用户名
      pass: 密码
      willTopic: 订阅主题
      willMsg: 自定义的离线消息
      willQos: 接收离线消息的级别
      clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID
*/
        public async void Connect()
        {
            if (this._mqttClient == null)
                return;

            try
            {
                if (this._mqttClient.IsConnected)
                {
                    return;
                }

                await _mqttClient.ConnectAsync(this._clientOption);
            }
            catch (Exception)
            {
                throw;
            }
        }

        public async void Disconnect()
        {
            if (this._mqttClient == null)
                return;
            if (this._mqttClient.IsConnected)
                await _mqttClient.DisconnectAsync();
        }

        public void Stop()
        {
            if (this._mqttClient == null)
                return;
            this._mqttClient.Dispose();
            this._mqttClient = null;
        }

        public void SubscribeAsync(string topic, string qos)
        {
            Task.Factory.StartNew(async () =>
            {
                await _mqttClient.SubscribeAsync(
                    new List<TopicFilter>
                    {
                            new TopicFilter(
                                topic,
                                (MqttQualityOfServiceLevel)Enum.Parse(typeof (MqttQualityOfServiceLevel), qos))
                    });
            });
        }

        public void PublishAsync(string topic, string qos, string payload)
        {
            if (this._mqttClient == null)
                return;
            Task.Factory.StartNew(async () =>
            {
                var msg = new MqttApplicationMessage()
                {
                    Topic = topic,
                    Payload = Encoding.UTF8.GetBytes(payload),
                    QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), qos),
                    Retain = true // 是否保留信息
                };
                await _mqttClient.PublishAsync(msg);
            });
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152846.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • C++利用stringstream进行类型转换「建议收藏」

    C++利用stringstream进行类型转换「建议收藏」利用STL中sstream库的字符串流(stringstream)可以非常方便地进行类型转换,尤其是字符串和数字的转换。例:整型和字符串类型的相互转化#include&lt;sstream&gt;//开头记得加上这个//整型转换为string类型stringint2string(intnum){stringstreamss;ss&lt;&lt;num…

    2022年5月18日
    38
  • goland 2021 破解 激活码-激活码分享

    (goland 2021 破解 激活码)2021最新分享一个能用的的激活码出来,希望能帮到需要激活的朋友。目前这个是能用的,但是用的人多了之后也会失效,会不定时更新的,大家持续关注此网站~IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.html…

    2022年3月26日
    521
  • 软件工程师待遇怎么样?软件工程师薪水到底有多高?「建议收藏」

    软件工程师待遇怎么样?软件工程师薪水到底有多高?「建议收藏」随着技术不断进步,行业对软件开发技能的需求急剧上升。在此情况下,软件工程师(程序员)薪水上涨便很正常。通常来说,个人薪水是高是低,则与自身积累的经验、所处的地点以及产业分不开。据国际调研机构IDC在报告中公开的数据:2018年,全球有2230万名的软件工程师;其中,全职程序员1165万名,兼职人员635万名,非专业人员430万名。美国拥有最多的软件工程师,651017人;其次是中国,183…

    2025年11月28日
    23
  • RedisClient下载地址[通俗易懂]

    RedisClient下载地址[通俗易懂]下载地址:https://github.com/caoxinyu/RedisClient/tree/windows/release

    2022年10月12日
    2
  • 贴片机保养步骤 保养项目 保养的目的与作用

    贴片机是一种高强度、长时间、超负荷的运转设备,需要定期进行细心的呵护保养,贴片机保养的好,运转稳定,保证日常生产产能,贴片机如衣食父母;贴片机保养不到位或经常不保养,性能再好的机子也会不堪重负出现各类毛病,影响生产产能和产品质量,因此贴片机保养是贴片厂非常重要的一环工作内容事项。(延伸阅读:何为SMT?smt是做什么的,smt贴片是什么意思?)贴片机日保用干净白布清洁机器表面灰尘,包含机身、显示器、键盘、鼠标、开关等;检测气压正常值;检查机器内部各装置是否正常;用吸尘器清洁机器里

    2022年4月7日
    38
  • matlab在axis,matlab中axis的用法

    matlab在axis,matlab中axis的用法>>axis([02*pi-0.90.9])图5.1.3使用了图形修饰的plot函数绘制的正弦曲线5.1.3图形的比较显示在一般默认的情况下,MATLAB每次使用plot……>>axis([02*pi-0.90.9])图5.1.3使用了图形修饰的plot函数绘制的正弦曲线5.1.3图形的比较显示在一般默认的情况下…

    2022年6月3日
    39

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号