MQTTnet[通俗易懂]

MQTTnet[通俗易懂]近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置源码>>>GitHub//Broker设置options.MaxPendingMessagesPerClient=1000;options.EnablePersistentSessions…

大家好,又见面了,我是你们的朋友全栈君。

近期学习了一下物联网中应用较广的MQTT协议,同时使用MQTTnet开源类库做了简单实现,因此做下笔记。
环境:.NET Framework 4.6.1 MQTTnet 2.8.2.0 遵循MQTT 3.1.0协议规范 源码 >>> GitHub
注意:在实现订阅者离线再连接时,一直接受不到离线信息,需要做一下配置

// Broker设置
options.MaxPendingMessagesPerClient = 1000;
options.EnablePersistentSessions = true;

// 客户端,ClientId不能变
_clientOption.CleanSession = false;

以下是两个封装的类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;

namespace MQTT.Util
{
    public class MqttServer
    {
        private IMqttServer _mqttServer = null;

        private string _ip = string.Empty;
        public string IP { get { return this._ip; } }

        private string _port = string.Empty;
        public string Port { get { return this._port; } }

        public Action<MqttConnectionValidatorContext> ConnectionValidator = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;
        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientSubscribedTopicEventArgs> ClientSubscribedTopic = null;
        public EventHandler<MqttClientUnsubscribedTopicEventArgs> ClientUnsubscribedTopic = null;
        public EventHandler Started = null;
        public EventHandler Stopped = null;

        public MqttServer(string ip, string port)
        {
            this._ip = ip;
            this._port = port;
        }

        public async void Start()
        {
            // Backlog:表示服务器可以接受的并发请求的最大值
            var optionBuilder = new MqttServerOptionsBuilder().WithConnectionBacklog(1000).WithDefaultEndpointPort(Convert.ToInt32(this._port));
            optionBuilder.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(this._ip));

            var options = optionBuilder.Build() as MqttServerOptions;
            options.ConnectionValidator += this.ConnectionValidator;
            options.MaxPendingMessagesPerClient = 1000;
            options.EnablePersistentSessions = true;

            this._mqttServer = new MqttFactory().CreateMqttServer();
            this._mqttServer.ClientConnected += this.ClientConnected;
            this._mqttServer.ClientDisconnected += this.ClientDisconnected;
            this._mqttServer.ApplicationMessageReceived += this.ApplicationMessageReceived;
            this._mqttServer.ClientSubscribedTopic += this.ClientSubscribedTopic;
            this._mqttServer.ClientUnsubscribedTopic += this.ClientUnsubscribedTopic;
            this._mqttServer.Started += this.Started;
            this._mqttServer.Stopped += this.Stopped;

            await _mqttServer.StartAsync(options);
        }

        public void Stop()
        {
            if (this._mqttServer == null)
                return;
            foreach (var clientSessionStatuse in this._mqttServer.GetClientSessionsStatusAsync().Result)
            {
                clientSessionStatuse.DisconnectAsync();
            }
            this._mqttServer.StopAsync();
            this._mqttServer = null;
        }
    }
}

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace MQTT.Util
{
    public class MqttClient
    {
        private IMqttClient _mqttClient = null;
        private MqttClientOptions _clientOption = null;

        private string _serverIP = string.Empty;
        public string ServerIP { get { return this._serverIP; } }

        private string _serverPort = string.Empty;
        public string ServerPort { get { return this._serverPort; } }

        public EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived = null;
        public EventHandler<MqttClientConnectedEventArgs> ClientConnected = null;
        public EventHandler<MqttClientDisconnectedEventArgs> ClientDisconnected = null;

        public MqttClient(string clientId = "")
        {
            if (clientId == string.Empty)
                clientId = Guid.NewGuid().ToString();

            this._clientOption = new MqttClientOptions() { ClientId = clientId };
            this._clientOption.CleanSession = false;
            this._clientOption.KeepAlivePeriod = TimeSpan.FromSeconds(90);
            this._clientOption.CommunicationTimeout = TimeSpan.FromSeconds(10);
            // 遗嘱信息
            // this._clientOption.WillMessage = 
            this._clientOption.ProtocolVersion = MQTTnet.Serializer.MqttProtocolVersion.V310;

            this._mqttClient = new MqttFactory().CreateMqttClient();
        }

        public void Init(string ip, string port, string userName, string pwd)
        {
            this._serverIP = ip;
            this._serverPort = port;

            _mqttClient.ApplicationMessageReceived += this.ApplicationMessageReceived;
            _mqttClient.Connected += this.ClientConnected;
            _mqttClient.Disconnected += this.ClientDisconnected;
            this._clientOption.ChannelOptions = new MqttClientTcpOptions()
            {
                Server = this._serverIP,
                Port = Convert.ToInt32(this._serverPort)
            };
            this._clientOption.Credentials = new MqttClientCredentials()
            {
                Username = userName,
                Password = pwd
            };
        }

        /**
      host: 服务器地址
      port: 服务器端口
      tls:  是否使用tls协议,mosca是支持tls的,如果使用了要设置成true
      keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s
      clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息
      auth: 是否使用登录验证
      user: 用户名
      pass: 密码
      willTopic: 订阅主题
      willMsg: 自定义的离线消息
      willQos: 接收离线消息的级别
      clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID
*/
        public async void Connect()
        {
            if (this._mqttClient == null)
                return;

            try
            {
                if (this._mqttClient.IsConnected)
                {
                    return;
                }

                await _mqttClient.ConnectAsync(this._clientOption);
            }
            catch (Exception)
            {
                throw;
            }
        }

        public async void Disconnect()
        {
            if (this._mqttClient == null)
                return;
            if (this._mqttClient.IsConnected)
                await _mqttClient.DisconnectAsync();
        }

        public void Stop()
        {
            if (this._mqttClient == null)
                return;
            this._mqttClient.Dispose();
            this._mqttClient = null;
        }

        public void SubscribeAsync(string topic, string qos)
        {
            Task.Factory.StartNew(async () =>
            {
                await _mqttClient.SubscribeAsync(
                    new List<TopicFilter>
                    {
                            new TopicFilter(
                                topic,
                                (MqttQualityOfServiceLevel)Enum.Parse(typeof (MqttQualityOfServiceLevel), qos))
                    });
            });
        }

        public void PublishAsync(string topic, string qos, string payload)
        {
            if (this._mqttClient == null)
                return;
            Task.Factory.StartNew(async () =>
            {
                var msg = new MqttApplicationMessage()
                {
                    Topic = topic,
                    Payload = Encoding.UTF8.GetBytes(payload),
                    QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), qos),
                    Retain = true // 是否保留信息
                };
                await _mqttClient.PublishAsync(msg);
            });
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152846.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 事业单位图形推理1000题及答案_小学生图形推理题

    事业单位图形推理1000题及答案_小学生图形推理题  2020年事业单位联考:《职测》判断推理神预测!   经过几个月的紧张备考,广大考生终于在今天走上了“战场”。经过小时的奋笔疾书,2020年下半年事业单位联考职业能力测试于今天上午拉下帷幕。根据学员的惊喜反馈,我们发现华图教育又双叒叕预测中题目了!!!   通过考生们对题目的回忆,华图教育惊喜地发现有4个考点跟我们考前给学员预测的大致相同!下面我们就一起来简单看一下:   一…

    2025年8月27日
    7
  • css漂浮广告代码_html浮动窗口怎么做

    css漂浮广告代码_html浮动窗口怎么做1.html部分复制代码代码如下:blog_floatdiv.html我是浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div浮动的div…

    2022年9月16日
    4
  • Zuul网关使用_zuul网关的作用

    Zuul网关使用_zuul网关的作用Zuul介绍Zuul与SpringCloudGateway作用差不多,推荐还是使用SpringCloudGateway,毕竟是Spring家族的,优先级高一些。他们都和Nginx一样,主要是用于服务器的反向代理;只要是反向代理,那么久可以提供路由、监控、弹性、安全等功能;一般也是说是网关,因为数据的入口都从这么流入流出。Zuul的核心是过滤器,通过这些过滤器我们可以扩展出很多功能,比如:动态路由动态地将客户端的请求路由到后端不同的服务,做一些逻辑处理,比如聚合多个服务的数据返回。请求监

    2022年8月15日
    9
  • 直通串口线和交叉串口线

    直通串口线和交叉串口线直通串口线和交叉串口线

    2022年6月19日
    31
  • 35 Great free Asterisk applications(35个Asterisk免费应用)

    35 Great free Asterisk applications(35个Asterisk免费应用)

    2021年5月7日
    136
  • C# 中使用正则表达式 Regex.Matches方法的几个应用[转]

    C# 中使用正则表达式 Regex.Matches方法的几个应用[转]

    2021年11月17日
    52

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号