使用 MQTTnet 实现 MQTT 通信示例

使用 MQTTnet 实现 MQTT 通信示例一、什么是MQTT?MQTT(MessageQueuingTelemetryTransport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。MQTT是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为OASIS规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景。二、MQTTnetMQ…

大家好,又见面了,我是你们的朋友全栈君。

一、什么是 MQTT ?

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。MQTT 是基于二进制消息的发布/订阅编程模式的消息协议,如今已经成为 OASIS 规范,由于规范很简单,非常适合需要低功耗和网络带宽有限的 IoT 场景。
github上还发现了一个项目,可以直接看协议:MQTT协议中文版

二、MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.

MQTT 开源库还有 MqttDotNetnMQTTM2MQTT 等,不过这几个里面,目前star最多的还是MQTTnet(编辑时间2018.5.9)。

三、创建项目并导入类库

在解决方案在右键单击-选择“管理解决方案的 NuGet 程序包”-在“浏览”选项卡下面搜索 MQTTnet,为服务端项目和客户端项目都安装上 MQTTnet 库。示例中使用的是2.7.5.0版本,不同版本最低要求的.net版本或其它支持,在NuGet选中MQTTNet,右侧可以看到具体描述。

客户端简单Demo可以见官方文档:https://github.com/chkr1011/MQTTnet/wiki/Client
本文示例程序github:https://github.com/landbroken/MQTTLearning

评论有不知道怎么用示例程序的,简单解释一下。目录一开始建的有点问题,调试稍微麻烦一点,懒得改了。

  • 1、clone或download项目
  • 2、vs2015或以上版本打开MQTTLearning/MqttServerTest/MqttServerTest.sln
  • 3、服务器端示例在SeverTest项目里面
  • 4、客户端示例在ClientTest项目里面
    调试:
    方法1)是vs里两个项目设为同时启动;
    方法2)一端用生成的exe启动,一端在vs里用debug启动

一般可以直接打开的,万一vs有路径依赖问题:

  • 1、如果项目路径依赖有问题,删掉重新添加一遍SeverTest.csproj和ClientTest.csproj
  • 2、如果MQTTnet 库引用路径有问题,删掉从packages里面重新引用一遍,或者nuget里面添加一下

3.1 服务器

直接跑一下示例程序就知道了
下面简单解释一下主要部分。

3.1.1 初始化并启动服务器

首先,初始化并启动服务器

1、这里是异步启动,用的2.7.5版本的库

Task.Run(async () => { 
    await StartMqttServer_2_7_5(); });

2、配置设置
WithDefaultEndpointPort是设置使用的端口,协议里默认是用1883,不过调试我改成8222了。
WithConnectionValidator是用于连接验证,验证client id,用户名,密码什么的。示例没用数据库,随便写死了两个值。
还有其他配置选项,比如加密协议,可以在官方文档里看看,示例就是先简单能用。

// Configure MQTT server.
                var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithConnectionBacklog(100)
                    .WithDefaultEndpointPort(8222)
                    .WithConnectionValidator(ValidatingMqttClients())
                    ;

3、添加事件触发
ApplicationMessageReceived 是服务器接收到消息时触发的事件,可用来响应特定消息。
ClientConnected 是客户端连接成功时触发的事件。
ClientDisconnected 是客户端断开连接时触发的事件。

mqttServer = new MqttFactory().CreateMqttServer();
                mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;
                mqttServer.ClientConnected += MqttServer_ClientConnected;
                mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;

异步启动mqtt服务器,实际项目可能要加一个启动失败处理

Task.Run(async () => { 
    await mqttServer.StartAsync(optionsBuilder.Build()); });

示例里面为了简单调试玩的,写了几个触发命令,在启动后,输入对于字符串即可触发。

字符串 效果
exit 关闭mqtt服务
hello: 发送topic为topic/hello的消息,payload为冒号后面的数据
control: 发送topic为topic/control的消息,payload为冒号后面的数据
subscribe: 订阅topic为冒号后面的消息

3.1.2 消息发送

mqtt的消息包含topic和payload两部分。topic就是消息主题(类型),用于另外一端判断这个消息是干什么用的。payload就是实际想要发送的数据。
WithTopic给一个topic。
WithPayload给一个msg。
WithAtMostOnceQoS设置QoS,至多1次。也可以设为别的。
PublishAsync异步发送出去。

string topic = "topic/hello";
var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(msg)
                .WithAtMostOnceQoS()
                .WithRetainFlag()
                .Build();
await mqttServer.PublishAsync(message);

3.1.3 完整源码

using MQTTnet;
using MQTTnet.Diagnostics;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Collections.Generic;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace MqttServerTest
{ 
   
    class Program
    { 
   
        private static IMqttServer mqttServer = null;
        private static List<string> connectedClientId = new List<string>();

        static void Main(string[] args)
        { 
   
            Task.Run(async () => { 
    await StartMqttServer_2_7_5(); });

            // Write all trace messages to the console window.
            MqttNetGlobalLogger.LogMessagePublished += MqttNetTrace_TraceMessagePublished;

            //2.4.0版本
            //MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;
            //new Thread(StartMqttServer).Start();
            while (true)
            { 
   
                if (mqttServer==null)
                { 
   
                    Console.WriteLine("Please await mqttServer.StartAsync()");
                    Thread.Sleep(1000);
                    continue;
                }

                var inputString = Console.ReadLine().ToLower().Trim();

                if (inputString == "exit")
                { 
   
                    Task.Run(async () => { 
    await EndMqttServer_2_7_5(); });
                    Console.WriteLine("MQTT服务已停止!");
                    break;
                }
                else if (inputString == "clients")
                { 
   
                    var connectedClients = mqttServer.GetConnectedClientsAsync();

                    Console.WriteLine($"客户端标识:");
                    //2.4.0
                    //foreach (var item in mqttServer.GetConnectedClients())
                    //{ 
   
                    // Console.WriteLine($"客户端标识:{item.ClientId},协议版本:{item.ProtocolVersion}");
                    //}
                }
                else if (inputString.StartsWith("hello:"))
                { 
   
                    string msg = inputString.Substring(6);
                    Topic_Hello(msg);
                }
                else if (inputString.StartsWith("control:"))
                { 
   
                    string msg = inputString.Substring(8);
                    Topic_Host_Control(msg);
                }
                else if (inputString.StartsWith("subscribe:"))
                { 
   
                    string msg = inputString.Substring(10);
                    Subscribe(msg);
                }
                else
                { 
   
                    Console.WriteLine($"命令[{inputString}]无效!");
                }
                Thread.Sleep(100);
            }
        }

        private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)
        { 
   
            Console.WriteLine($"客户端[{e.Client.ClientId}]已连接,协议版本:{e.Client.ProtocolVersion}");
            connectedClientId.Add(e.Client.ClientId);
        }

        private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)
        { 
   
            Console.WriteLine($"客户端[{e.Client.ClientId}]已断开连接!");
            connectedClientId.Remove(e.Client.ClientId);
        }

        private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        { 
   
            Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
            Console.WriteLine($"客户端[{e.ClientId}]>>");
            Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
            Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
            Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
            Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
            Console.WriteLine();
        }

        private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetLogMessagePublishedEventArgs e)
        { 
   
            var trace = $">> [{e.TraceMessage.Timestamp:O}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Source}] [{e.TraceMessage.Level}]: {e.TraceMessage.Message}";
            if (e.TraceMessage.Exception != null)
            { 
   
                trace += Environment.NewLine + e.TraceMessage.Exception.ToString();
            }

            Console.WriteLine(trace);
        }

        #region 2.7.5

        private static async Task StartMqttServer_2_7_5()
        { 
   
            if (mqttServer == null)
            { 
   
                // Configure MQTT server.
                var optionsBuilder = new MqttServerOptionsBuilder()
                    .WithConnectionBacklog(100)
                    .WithDefaultEndpointPort(8222)
                    .WithConnectionValidator(ValidatingMqttClients())
                    ;

                // Start a MQTT server.
                mqttServer = new MqttFactory().CreateMqttServer();
                mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;
                mqttServer.ClientConnected += MqttServer_ClientConnected;
                mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;

                Task.Run(async () => { 
    await mqttServer.StartAsync(optionsBuilder.Build()); });
                //mqttServer.StartAsync(optionsBuilder.Build());
                Console.WriteLine("MQTT服务启动成功!");
            }
        }

        private static async Task EndMqttServer_2_7_5()
        { 
   
            if (mqttServer!=null)
            { 
   
                await mqttServer.StopAsync();
            }
            else
            { 
   
                Console.WriteLine("mqttserver=null");
            }
        }

        private static Action<MqttConnectionValidatorContext> ValidatingMqttClients()
        { 
   
            // Setup client validator. 
            var options =new MqttServerOptions();
            options.ConnectionValidator = c =>
            { 
   
                Dictionary<string, string> c_u = new Dictionary<string, string>();
                c_u.Add("client001", "username001");
                c_u.Add("client002", "username002");
                Dictionary<string, string> u_psw = new Dictionary<string, string>();
                u_psw.Add("username001", "psw001");
                u_psw.Add("username002", "psw002");

                if (c_u.ContainsKey(c.ClientId) && c_u[c.ClientId] == c.Username)
                { 
   
                    if (u_psw.ContainsKey(c.Username) && u_psw[c.Username] == c.Password)
                    { 
   
                        c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
                    }
                    else
                    { 
   
                        c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                    }
                }
                else
                { 
   
                    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
                }
            };
            return options.ConnectionValidator;
        }
        
        private static void Usingcertificate(ref MqttServerOptions options)
        { 
   
            var certificate = new X509Certificate(@"C:\certs\test\test.cer", "");
            options.TlsEndpointOptions.Certificate = certificate.Export(X509ContentType.Cert);
            var aes = new System.Security.Cryptography.AesManaged();

        }

        #endregion

        #region Topic

        private static async void Topic_Hello(string msg)
        { 
   
            string topic = "topic/hello";

            //2.4.0版本的
            //var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);
            //mqttClient.PublishAsync(appMsg);

            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(msg)
                .WithAtMostOnceQoS()
                .WithRetainFlag()
                .Build();
            await mqttServer.PublishAsync(message);
        }

        private static async void Topic_Host_Control(string msg)
        { 
   
            string topic = "topic/host/control";

            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(msg)
                .WithAtMostOnceQoS()
                .WithRetainFlag(false)
                .Build();
            await mqttServer.PublishAsync(message);
        }

        /// <summary>
        /// 替指定的clientID订阅指定的内容
        /// </summary>
        /// <param name="topic"></param>
        private static void Subscribe(string topic)
        { 
   
            List<TopicFilter> topicFilter = new List<TopicFilter>();
            topicFilter.Add(new TopicFilterBuilder()
                .WithTopic(topic)
                .WithAtMostOnceQoS()
                .Build());
            //给"client001"订阅了主题为topicFilter的payload
            mqttServer.SubscribeAsync("client001", topicFilter);
            Console.WriteLine($"Subscribe:[{"client001"}],Topic:{topic}");
        }

        #endregion
    }
}

3.2 客户端

未完待续

参考文献

1、《使用 MQTTnet 快速实现 MQTT 通信》:链接
这篇文章是vs2017+.net core+mqttnet2.4.0的,目前库已经更新了好几个版本,如果用最新版的不能直接运行文章里的程序。
2、MQTT官网
3、开源库地址:MQTTnet
4、开源库对应文档:https://github.com/chkr1011/MQTTnet/wiki/Client

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152873.html原文链接:https://javaforall.net

(0)
上一篇 2022年6月25日 下午3:46
下一篇 2022年6月25日 下午3:46


相关推荐

  • java eclipse导入_Eclipse怎么导入web项目 Eclipse导入web项目详细教程

    java eclipse导入_Eclipse怎么导入web项目 Eclipse导入web项目详细教程Eclipse 作为最热门的开发工具还有许多人不知道使用 eclipse 或者 myeclipse 的时候怎么向 eclipse 软件中导入 web 项目 不用担心其实很简单 小编今天为大家带来了一篇详细的 Eclipse 以及 Myeclipse 导入 web 项目的详细教程 一起来看看吧 详细教程 1 将项目根目录导入 File gt Import gt General gt ExistingProj

    2026年3月17日
    2
  • 如何看懂讯飞星火大模型的三大技术突破与应用前景?

    如何看懂讯飞星火大模型的三大技术突破与应用前景?

    2026年3月14日
    2
  • OmniDiskSweeper DaisyDisk mac 查看文件占用

    OmniDiskSweeper DaisyDisk mac 查看文件占用OmniDiskSwee 转载于 https www cnblogs com rexzhao p 5826265 html

    2026年3月17日
    2
  • win10下CUDA和CUDNN的安装(超详细)!亲测有效![通俗易懂]

    win10下CUDA和CUDNN的安装(超详细)!亲测有效![通俗易懂]CUDA10安装配置CUDA10的安装包可直接从NVIDIA官网下载。根据相应的系统选项,我选择的是cuda_10.1.168_425.25_win10.exe(大小为2.3G),安装的时候建议选择自定义而不是“精简”(从下面的英文解释可以看出,其实这里的精简写成完整应该更贴切,他会安装所有组件并覆盖现有驱动,然而我并不想安装全家桶,何况我的官方显卡驱动比他的新)。下载路径:https…

    2022年6月11日
    31
  • Python多线程通信_python socket多线程

    Python多线程通信_python socket多线程作者:billy版权声明:著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处创建线程线程(Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每个线程并行执行不同的任务。由于线程是操作系统直接支持的执行单元,因此,高级语言(如Python、Java等)通常都内置多线程的支持。Python的标准库提供了两个模块:_thread和threading,_thread

    2022年8月31日
    7
  • RNN BPTT算法推导

    RNN BPTT算法推导BPTT(沿时反向传播算法)基本原理与BP算法一样,包含三个步骤:前向计算每个神经元的输出值反向计算每个神经元的误差项δjδ_jδj​,它是误差函数E对神经元j的加权输入netjnet_jnetj​的偏导数计算每个权重的梯度最后再用随机梯度下降算法更新权重循环曾如图所示:1.1前向计算循环层的前向计算:隐层:st=f(Uxt+Wst−1)s_t=f(Ux_t+Ws_{t-1})…

    2022年6月23日
    43

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号