mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端服务端:usingMQTTnet;usingMQTTnet.Server;usingSystem;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.Data;usingSystem.Diagnostics;usingSystem.IO;usingSystem.Linq;usingSyst…

大家好,又见面了,我是你们的朋友全栈君。

服务端:

using MQTTnet;

using MQTTnet.Server;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.IO;

using System.Linq;

using System.Net;

using System.ServiceProcess;

using System.Text;

using System.Threading.Tasks;

using System.Timers;

namespace MQTTNetService

{

public partial class MQTTNetService : ServiceBase

{

private MqttServer mqttServer = null;

private System.Timers.Timer timer = null;

//此集合用于判断写入日志在一段时间内不重,以客户端id为依据,最多1000个清零;

private List subClientIDs = new List();

public MQTTNetService()

{

InitializeComponent();

//创建一个定时器,检查5s内有多少客户端接入并将相关信息记录到日志中

timer = new System.Timers.Timer();

timer.AutoReset = true;

timer.Enabled = true;

timer.Interval = 5000;

timer.Elapsed += new ElapsedEventHandler(GetSubClientSAndSetShow);

}

protected override void OnStart(string[] args)

{

//开启服务

CreateMQTTServer();

if (timer.Enabled == false)

{

timer.Enabled = true;

timer.Start();

}

}

protected override void OnStop()

{

if (timer.Enabled == true)

{

timer.Enabled = false;

timer.Stop();

}

}

///

/// 开启服务

///

private async void CreateMQTTServer()

{

if (mqttServer == null)

{

var options = new MqttServerOptions();

var optionsBuilder = new MqttServerOptionsBuilder();

//指定 ip地址,默认为本地,但此方法不能使用ipaddress报错,有哪位大神帮解答,感激。

//options.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(“”))

//指定端口

optionsBuilder.WithDefaultEndpointPort(1883);

//连接记录数,默认 一般为2000

//optionsBuilder.WithConnectionBacklog(2000);

mqttServer = new MqttFactory().CreateMqttServer() as MqttServer;

//将发送的消息加到日志

mqttServer.ApplicationMessageReceived += (s, e) =>

{

string msg = @”发送消息的客户端id:” + e.ClientId + “\n”

+ “发送时间:” + DateTime.Now + “\n”

+ “发送消息的主题:” + e.ApplicationMessage.Topic + “\n”

+ “发送的消息内容:” + Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[0]) + “\n”

+ “————————————————–\n”

;

WriteMsgLog(msg);

};

await mqttServer.StartAsync(options);

}

}

#region 记录日志

///

/// 消息记录日志

///

///

private void WriteMsgLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\Msglog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

///客户端链接日志

///

///

private void WriteClientLinkLog(string msg)

{

//string path = @”C:\log.txt”;

//该日志文件会存在windows服务程序目录下

string path = AppDomain.CurrentDomain.BaseDirectory + “\\ClientLinklog.txt”;

FileInfo file = new FileInfo(path);

if (!file.Exists)

{

FileStream fs;

fs = File.Create(path);

fs.Close();

}

using (FileStream fs = new FileStream(path, FileMode.Append, FileAccess.Write))

{

using (StreamWriter sw = new StreamWriter(fs))

{

sw.WriteLine(DateTime.Now.ToString() + ” ” + msg);

}

}

}

///

/// 通过定时器将客户端链接信息写入日志

///

///

///

private void GetSubClientSAndSetShow(object sender, ElapsedEventArgs e)

{

if (mqttServer != null)

{

List subclients = mqttServer.GetConnectedClientsAsync().Result.ToList();

if (subclients.Count > 0)

{

foreach (var item in subclients)

{

if (!subClientIDs.Contains(item.ClientId))

{

subClientIDs.Add(item.ClientId);

string msg = @”接收客户端ID:” + item.ClientId + “\n”

+ “接收时间:” + DateTime.Now + “\n”

+ “协议版本:” + item.ProtocolVersion + “\n”

+ “最后收到的非保持活包” + item.LastNonKeepAlivePacketReceived + “\n”

+ “最后收到的包” + item.LastPacketReceived + “\n”

+ “挂起的应用程序消息” + item.PendingApplicationMessages + “\n”

+ “————————————————” + “\n”;

WriteClientLinkLog(msg);

}

}

if (subClientIDs.Count >= 1000)

{

subClientIDs.Clear();

}

}

}

}

#endregion

}

}

以上服务端不能判断特定标识的客户端接入,也就是只要有客户端连接就会接入,不够完善

客户端:简单用于测试 接收net core

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Text;

using System.Threading.Tasks;

namespace mqttclienttest0101

{

class Program

{

static MqttClient mqttClient = null;

static void Main(string[] args)

{

//var factory = new MqttFactory();

//var mqttClient = factory.CreateMqttClient();

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(“192.168.3.193”);

//options.WithTls();

//options.WithCleanSession();

//var task = mqttClient.ConnectAsync(options.Build());

RunAsync();

Console.ReadLine();

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(“TopicTest”);

message.WithPayload(“topictest001”);

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

Console.WriteLine(“Hello World!”);

}

public static async Task RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = new Guid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = “127.0.1.1”

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient()as MqttClient;

mqttClient.ApplicationMessageReceived += (s, e) =>

{

Console.WriteLine(“### RECEIVED APPLICATION MESSAGE ###”);

Console.WriteLine($”+ Topic = {e.ApplicationMessage.Topic}”);

Console.WriteLine($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

Console.WriteLine($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

Console.WriteLine($”+ Retain = {e.ApplicationMessage.Retain}”);

Console.WriteLine();

};

mqttClient.Connected += async (s, e) =>

{

Console.WriteLine(“### CONNECTED WITH SERVER ###”);

await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(“#”).Build());

Console.WriteLine(“### SUBSCRIBED ###”);

};

mqttClient.Disconnected += async (s, e) =>

{

Console.WriteLine(“### DISCONNECTED FROM SERVER ###”);

await Task.Delay(TimeSpan.FromSeconds(5));

try

{

await mqttClient.ConnectAsync(options);

}

catch

{

Console.WriteLine(“### RECONNECTING FAILED ###”);

}

};

try

{

await mqttClient.ConnectAsync(options);

}

catch (Exception exception)

{

Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

}

}

客户端2 发送

mqttnet 详解_vs2017通过mqttnet创建mqtt服务端 客户端

using MQTTnet;

using MQTTnet.Client;

using MQTTnet.Protocol;

using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Drawing;

using System.Linq;

using System.Text;

using System.Threading.Tasks;

using System.Windows.Forms;

namespace MqttClientTest01

{

public partial class Form1 : Form

{

private MqttClient mqttClient = null;

public Form1()

{

InitializeComponent();

}

private void Form1_Load(object sender, EventArgs e)

{

}

private void but_linkserver_Click(object sender, EventArgs e)

{

RunAsync();

创建一个新的MQTT客户端。

//var factory = new MqttFactory();

//mqttClient = factory.CreateMqttClient() as MqttClient;

使用构建器创建基于TCP的选项。

//var options = new MqttClientOptionsBuilder();

//options.WithClientId(new Guid().ToString());

//options.WithTcpServer(txtb_serverip.Text.Trim(), Convert.ToInt32(txtb_serverport.Text.Trim()));

//options.WithTls();

//options.WithCleanSession();

//but_submsg_Click(sender, e);

//var task = mqttClient.ConnectAsync(options.Build());

//if (task != null)

//{

//}

}

public void RunAsync()

{

try

{

var options = new MqttClientOptions

{

ClientId = Guid.NewGuid().ToString(),

CleanSession = true,

ChannelOptions = new MqttClientTcpOptions

{

//Server = “localhost”,

Server = txtb_serverip.Text.Trim(),

Port = Convert.ToInt32(txtb_serverport.Text.Trim()) ,

BufferSize=20*400,

},

//ChannelOptions = new MqttClientWebSocketOptions

//{

// Uri = “ws://localhost:59690/mqtt”

//}

};

var factory = new MqttFactory();

mqttClient = factory.CreateMqttClient() as MqttClient;

try

{

var task= mqttClient.ConnectAsync(options);

if (task!=null)

{

lab_linkstatus.Text = “连接成功!”;

lab_linktimer.Text = DateTime.Now.ToString();

}

}

catch (Exception exception)

{

// Console.WriteLine(“### CONNECTING FAILED ###” + Environment.NewLine + exception);

}

// Console.WriteLine(“### WAITING FOR APPLICATION MESSAGES ###”);

//while (true)

//{

// Console.ReadLine();

// await mqttClient.SubscribeAsync(new TopicFilter(“test”, MqttQualityOfServiceLevel.AtMostOnce));

// var applicationMessage = new MqttApplicationMessageBuilder()

// .WithTopic(“A/B/C”)

// .WithPayload(“Hello World”)

// .WithAtLeastOnceQoS()

// .Build();

// await mqttClient.PublishAsync(applicationMessage);

//}

}

catch (Exception exception)

{

Console.WriteLine(exception);

}

}

private void tb_username_TextChanged(object sender, EventArgs e)

{

}

private void but_clientsend_Click(object sender, EventArgs e)

{

if (mqttClient != null)

{

var message = new MqttApplicationMessageBuilder();

message.WithTopic(txtb_msgtopic.Text.Trim());

message.WithPayload(rtb_pubmsg.Text.Trim());

message.WithExactlyOnceQoS();

message.WithRetainFlag();

mqttClient.PublishAsync(message.Build());

}

}

private async void but_submsg_Click(object sender, EventArgs k)

{

if (mqttClient != null)

{

//mqttClient.ApplicationMessageReceived += (s, e) =>

//{

// rtb_submsgclient.AppendText(“### RECEIVED APPLICATION MESSAGE ###”);

// rtb_submsgclient.AppendText($”+ Topic = {e.ApplicationMessage.Topic}”);

// rtb_submsgclient.AppendText($”+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}”);

// rtb_submsgclient.AppendText($”+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}”);

// rtb_submsgclient.AppendText($”+ Retain = {e.ApplicationMessage.Retain}”);

//};

var meww= await mqttClient.SubscribeAsync(new TopicFilter(txtb_subtopic.Text.Trim(), MqttQualityOfServiceLevel.AtMostOnce));

订阅主题

//IList subresult = await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(txtb_subtopic.Text.Trim()).Build());

// if (subresult != null)

// {

// List result = subresult.ToList();

// if (result.Count > 0)

// {

// this.Invoke(new Action(() =>

// {

// foreach (var item in result)

// {

// rtb_submsgclient.AppendText(item.TopicFilter.QualityOfServiceLevel.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.TopicFilter.Topic.ToString() + “\n”);

// rtb_submsgclient.AppendText(item.ReturnCode.ToString() + “\n”);

// }

// }));

// }

// }

}

}

}

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152866.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 一些能用的 BT Tracker 服务器地址【不定时更新】[通俗易懂]

    一些能用的 BT Tracker 服务器地址【不定时更新】[通俗易懂]不定时更新可用的BTTracker服务器列表(BTTrackerList);相关工具BEncodeEditor:https://sites.google.com/site/ultimasites/bencode-editorTrackerEditor:https://github.com/GerryFerdinandus/bittorrent-tracker-editor/releases可以将服务器列表写入TrackerEditor程序同目录下的add_trackers.txt

    2022年6月18日
    151
  • python通用激活码(最新序列号破解)

    python通用激活码(最新序列号破解),https://javaforall.net/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

    2022年3月18日
    79
  • CSS-圆角边框

    CSS-圆角边框掌握了圆角边框的内容就可以做出类似百度搜索框的效果来代码 DOCTYPE tml htmllang en head metacharset UTF 8 metahttp equiv X UA Compatible content IE edge metahttp equiv X UA Compatible content IE edge metacharset UTF 8 head htmllang en

    2025年6月11日
    0
  • 一个例子让你了解Java反射机制

    一个例子让你了解Java反射机制本文来自:blog.csdn.net/ljphhjJAVA反射机制:通俗地说,反射机制就是可以把一个类,类的成员(函数,属性),当成一个对象来操作,希望读者能理解,也就是说,类,类的成员,我们在运行的时候还可以动态地去操作他们.理论的东东太多也没用,下面我们看看实践Demo~Demo:packagecn.lee.demo;import…

    2022年7月7日
    18
  • 计算机网络常用端口号大全「建议收藏」

    计算机网络常用端口号大全「建议收藏」一、概述:计算机端口号总数:65535,一般用到的是1~65535,0一般不使用0-1023:系统端口,也叫公认端口,这些端口只有系统特许的进程才能使用; 1024~65535为用户端口:1024-5000:临时端口,一般的应用程序使用1024到4999来进行通讯; 5001-65535:服务器(非特权)端口,用来给用户自定义端口。二、常用端口号:以下均为默认端口号,即未…

    2022年10月21日
    0
  • js如何替换指定的字符串_如果字符串内容替换

    js如何替换指定的字符串_如果字符串内容替换今天在写JavaScript替换字符串时,使用str.replace(“a”,”b”)方法替换发现只会替换第一个成功匹配的字符a而使用如果想要替换全部指定字符时,需要使用str.replace(/\a/g,”b”),这里g为全局标志,可以将全部的a替换成b…

    2022年10月22日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号