mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端服务端:usingMQTTnet;usingMQTTnet.Server;usingSystem;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.Data;usingSystem.Diagnostics;usingSystem.IO;usingSystem.Linq;usingSyst…

大家好,又见面了,我是你们的朋友全栈君。

服务端:

using MQTTnet;

using MQTTnet.Server;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.IO;

using System.Linq;

using System.Net;

using System.ServiceProcess;

using System.Text;

using System.Threading.Tasks;

using System.Timers;

namespace MQTTNetService

{

public partial class MQTTNetService : ServiceBase

{

private MqttServer mqttServer = null;

private System.Timers.Timer timer = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多1000个清零;

private List subClientIDs = new List();

public MQTTNetService()

{

InitializeComponent();

//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中

timer = new System.Timers.Timer();

timer.AutoReset = true;

timer.Enabled = true;

timer.Interval = 5000;

timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args)

{

//开启服务

CreateMQTTServer();

if (timer.Enabled == false)

{

timer.Enabled = true;

timer.Start();

}

}

protected override void OnStop()

{

if (timer.Enabled == true)

{

timer.Enabled = false;

timer.Stop();

}

}

///

/// 开启服务

///

private async void CreateMQTTServer()

{

if (mqttServer == null)

{

var options = new MqttServerOptions();

var optionsBuilder = new MqttServerOptionsBuilder();

//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。

//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))

//指定端口

optionsBuilder.WithDefaultEndpointPort(1883);

//连接记录数,默认 一般为2000

//optionsBuilder.WithConnectionBacklog(2000);

mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;

//将发送的消息加到日志

mqttServer.ApplicationMessageReceived += (s, e) =>

{

string msg = @”发送消息的客户端id:” + e.ClientId + “\n”

+ “发送时间:” + DateTime.Now + “\n”

+ “发送消息的主题:” + e.ApplicationMessage.Topic + “\n”

+ “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\n”

+ “————————————————–\n”

;

WriteMsgLog(msg);

};

await mqttServer.StartAsync(options);

}

}

#region 记录日志

///

/// 消息记录日志

///

///

private void WriteMsgLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

///客户端链接日志

///

///

private void WriteClientLinkLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

/// 通过定时器将客户端链接信息写入日志

///

///

///

private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)

{

if (mqttServer != null)

{

List subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();

if (subclients.Count > 0)

{

foreach (var item in subclients)

{

if (!subClientIDs.Contains(item.ClientId))

{

subClientIDs.Add(item.ClientId);

string msg = @”接收客户端ID:” + item.ClientId + “\n”

+ “接收时间:” + DateTime.Now + “\n”

+ “协议版本:” + item.ProtocolVersion + “\n”

+ “最后收到的非保持活包” + item.LastNonKeepAlivePacketReceived + “\n”

+ “最后收到的包” + item.LastPacketReceived + “\n”

+ “挂起的应用程序消息” + item.PendingApplicationMessages + “\n”

+ “————————————————” + “\n”;

WriteClientLinkLog(msg);

}

}

if (subClientIDs.Count >= 1000)

{

subClientIDs.Clear();

}

}

}

}

#endregion

}

}

以上服务端不能判断特定标识的客户端接入,也就是只要有客户端连接就会接入,不够完善

客户端:简单用于测试 接收net core

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Text;

using System.Threading.Tasks;

namespace mqttclienttest0101

{

class Program

{

static MqttClient mqttClient = null;

static void Main(string[] args)

{

//var factory = new MqttFactory();

//var mqttClient = factory.CreateMqttClient();

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(“192.168.3.193”);

//options.WithTls();

//options.WithCleanSession();

//var task = mqttClient.ConnectAsync(options.Build());

RunAsync();

Console.ReadLine();

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(“TopicTest”);

message.WithPayload(“topictest001”);

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

Console.WriteLine(“Hello World!”);

}

public static async Task RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = new Guid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = “127.0.1.1”

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient()as MqttClient;

mqttClient.ApplicationMessageReceived += (s, e) =>

{

Console.WriteLine(“### RECEIVED APPLICATION MESSAGE ###”);

Console.WriteLine($”+ Topic = {e.ApplicationMessage.Topic}”);

Console.WriteLine($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

Console.WriteLine($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

Console.WriteLine($”+ Retain = {e.ApplicationMessage.Retain}”);

Console.WriteLine();

};

mqttClient.Connected += async (s, e) =>

{

Console.WriteLine(“### CONNECTED WITH SERVER ###”);

await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(“#”).Build());

Console.WriteLine(“### SUBSCRIBED ###”);

};

mqttClient.Disconnected += async (s, e) =>

{

Console.WriteLine(“### DISCONNECTED FROM SERVER ###”);

await Task.Delay(TimeSpan.FromSeconds(5));

try

{

await mqttClient.ConnectAsync(options);

}

catch

{

Console.WriteLine(“### RECONNECTING FAILED ###”);

}

};

try

{

await mqttClient.ConnectAsync(options);

}

catch (Exception exception)

{

Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

}

}

客户端2 发送

mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Drawing;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Windows.Forms;

namespace MqttClientTest01

{

public partial class Form1 : Form

{

private MqttClient mqttClient = null;

public Form1()

{

InitializeComponent();

}

private void Form1_Load(object sender, EventArgs e)

{

}

private void but_linkserver_Click(object sender, EventArgs e)

{

RunAsync();

创建一个新的MQTT客户端。

//var factory = new MqttFactory();

//mqttClient = factory.CreateMqttClient() as MqttClient;

使用构建器创建基于TCP的选项。

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(txtb_serverip.Text.Trim(), Convert.ToInt32(txtb_serverport.Text.Trim()));

//options.WithTls();

//options.WithCleanSession();

//but_submsg_Click(sender, e);

//var task = mqttClient.ConnectAsync(options.Build());

//if (task != null)

//{

//}

}

public void RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = Guid.NewGuid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = txtb_serverip.Text.Trim(),

Port = Convert.ToInt32(txtb_serverport.Text.Trim()) ,

BufferSize=20*400,

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient() as MqttClient;

try

{

var task= mqttClient.ConnectAsync(options);

if (task!=null)

{

lab_linkstatus.Text = “连接成功!”;

lab_linktimer.Text = DateTime.Now.ToString();

}

}

catch (Exception exception)

{

// Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

// Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

private void tb_username_TextChanged(object sender, EventArgs e)

{

}

private void but_clientsend_Click(object sender, EventArgs e)

{

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(txtb_msgtopic.Text.Trim());

message.WithPayload(rtb_pubmsg.Text.Trim());

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

}

private async void but_submsg_Click(object sender, EventArgs k)

{

if (mqttClient != null)

{

//mqttClient.ApplicationMessageReceived += (s, e) =>

//{

// rtb_submsgclient.AppendText(“### RECEIVED APPLICATION MESSAGE ###”);

// rtb_submsgclient.AppendText($”+ Topic = {e.ApplicationMessage.Topic}”);

// rtb_submsgclient.AppendText($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

// rtb_submsgclient.AppendText($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

// rtb_submsgclient.AppendText($”+ Retain = {e.ApplicationMessage.Retain}”);

//};

var meww= await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));

订阅主题

//IList subresult = await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(txtb_subtopic.Text.Trim()).Build());

// if (subresult != null)

// {

// List result = subresult.ToList();

// if (result.Count > 0)

// {

// this.Invoke(new Action(() =>

// {

// foreach (var item in result)

// {

// rtb_submsgclient.AppendText(item.TopicFilter.QualityOfServiceLevel.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.TopicFilter.Topic.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.ReturnCode.ToString() + “\n”);

// }

// }));

// }

// }

}

}

}

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152866.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Linux rsync同步操作和inotify实时同步 、 Cobbler装机平台 (SERVICE06—-DAY21)[通俗易懂]

    Linux rsync同步操作和inotify实时同步 、 Cobbler装机平台 (SERVICE06—-DAY21)[通俗易懂]rsync同步操作•命令用法–rsync[选项…]源目录目标目录•同步与复制的差异–复制:完全拷贝源到目标–同步:增量拷贝,只传输变化过的数据同步控制•rsync操作选项–-n:测试同步过程,不做实际修改––delete:删除目标文件夹内多余的文档–-a:归档模式,相当于-rlptgoD–-v:显示详细操作信息–-z:传输过程中启用压缩/解…

    2022年7月18日
    15
  • 手机怎么复制网页上不能复制的文字_如何复制网页上收费文档的文字

    手机怎么复制网页上不能复制的文字_如何复制网页上收费文档的文字们在浏览网页的时候,时常会觉得有的内容不错,想复制下来,却发现有的网页内容不能复制,今天就教大家如何解决这个问题。虽然可以通过禁用脚本或是“查看源文件”,在源文件代码中复制需要的文章。不过复制文章的时候会有很多用不着的符号和代码。这样的操作方法其实都不如使用八爪鱼方便快捷。下面就给大家介绍一下如何利用八爪鱼采集网页上不能复制。步骤一、下载八爪鱼软件并登陆1、打开htt…

    2022年10月13日
    3
  • CRM客户关系管理系统(十二)

    CRM客户关系管理系统(十二)

    2021年5月27日
    103
  • phpstrom 2021.12激活码[最新免费获取]

    (phpstrom 2021.12激活码)这是一篇idea技术相关文章,由全栈君为大家提供,主要知识点是关于2021JetBrains全家桶永久激活码的内容IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.html1435QFILVV-eyJsaWN…

    2022年3月30日
    41
  • matlab interp1 pchip,matlab多项式插值interp1深入研究(1)「建议收藏」

    matlab interp1 pchip,matlab多项式插值interp1深入研究(1)「建议收藏」学习matlab不久,遇到了多项式插值interp1,在网上没有找到研究其插值方法的文章,在此,对其中插值方法做了一些研究,属于matlab范畴之外,但是无聊研究一下总的来说不会有坏处。interp1的具体运用也比较低,个人理解主要属于样本丢失,补充样本用,所以最后还介绍了傅里叶增值法。正文:首先介绍一个多项式插值函数:Y=interp1(x,y,X,’mothod’)本文主要讨论’mothod…

    2022年4月30日
    140
  • pycharm中查看某个函数定义_函数的三要素

    pycharm中查看某个函数定义_函数的三要素操作方式如下:(1)ctrl+shift+i查看函数定义(2)按住ctrl键,将鼠标放到函数上,就会显示函数信息,点击进去可以查看函数源码。(3)选中函数位置,按住ctrl+左键就会跳转到函数的定义处

    2022年8月29日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号