聊聊Dotnetty

聊聊Dotnetty以前 我曾经写过一个 C S 的期货交易系统 C S 就绕不开通信 我大概的了解了 net 的通信机制后 选择用 TCP 长连接实现了通信 客户端可以调用服务器端 服务器端可以主动推送消息到客户端 实现是实现了 但是代码笨重而且低效 近来又要解决一个 C S 的通信问题 感觉到自己以前写的通信继续复用会触犯自己的洁癖 同时深感自己的精力大不如以前 重新写一个优雅而且高效的通信的中间件没有任何信心 于是就在网络上

来段例子大概的做一下概念的了解

server端:

 static async Task RunServerAsync() { //重要的概念 IEventLoopGroup 代表了处理网络的线程,server端用两个,client端一个就够了。boss处理网络连接的概念,client处理数据概念。两个IEventLoopGroup不代表两个线程,而是代表了两组线程 IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1); IEventLoopGroup workerGroup= new MultithreadEventLoopGroup(); //如果通信需要加密的话,使用证书 X509Certificate2 tlsCertificate = null; if (ServerSettings.IsSsl) { tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password"); } try { var bootstrap = new ServerBootstrap(); bootstrap.Group(bossGroup, workerGroup); bootstrap.Channel 
  
    () 
   ; bootstrap 
   .Option(ChannelOption 
   .SoBacklog, 
   100) 
   .Handler(new LoggingHandler( 
   "SRV-LSTN")) 
   .ChildHandler(new ActionChannelInitializer 
   
     (channel => { //重要概念 增加Handler和Decoder,Encoder到pipeline中去 IChannelPipeline pipeline = channel 
    .Pipeline 
    ; if (tlsCertificate != null) { pipeline 
    .AddLast( 
    "tls", TlsHandler 
    .Server(tlsCertificate)) 
    ; } pipeline 
    .AddLast(new LoggingHandler( 
    "SRV-CONN")) 
    ; pipeline 
    .AddLast( 
    "framing-enc", new LengthFieldPrepender( 
    2)) 
    ; pipeline 
    .AddLast( 
    "framing-dec", new LengthFieldBasedFrameDecoder(ushort 
    .MaxValue, 
    0, 
    2, 
    0, 
    2)) 
    ; //自定义的handler pipeline 
    .AddLast( 
    "echo", new EchoServerHandler()) 
    ; })) 
    ; IChannel boundChannel = await bootstrap 
    .BindAsync(ServerSettings 
    .Port) 
    ; Console 
    .ReadLine() 
    ; await boundChannel 
    .CloseAsync() 
    ; } finally { await Task 
    .WhenAll( bossGroup 
    .ShutdownGracefullyAsync(TimeSpan 
    .FromMilliseconds( 
    100), TimeSpan 
    .FromSeconds( 
    1)), workerGroup 
    .ShutdownGracefullyAsync(TimeSpan 
    .FromMilliseconds( 
    100), TimeSpan 
    .FromSeconds( 
    1))) 
    ; } } 
    
  

客户端:

static async Task RunClientAsync() { var group = new MultithreadEventLoopGroup(); X509Certificate2 cert = null; string targetHost = null; if (ClientSettings.IsSsl) { cert = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password"); targetHost = cert.GetNameInfo(X509NameType.DnsName, false); } try { var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel 
  
    () .Option(ChannelOption.TcpNodelay, 
   true) .Handler( 
   new ActionChannelInitializer 
   
     (channel 
     => { IChannelPipeline pipeline = channel.Pipeline; 
    if (cert != 
    null) { pipeline.AddLast 
    ("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost))); } pipeline.AddLast(new LoggingHandler()); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); pipeline.AddLast("echo", new EchoClientHandler()); })); IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port)); Console.ReadLine(); await clientChannel.CloseAsync(); } finally { await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); } } 
    
  

Server端的Handler的定义:

public class EchoServerHandler : ChannelHandlerAdapter { public override void ChannelRead(IChannelHandlerContext context, object message) { var buffer = message as IByteBuffer; if (buffer != null) { Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8)); } context.WriteAsync(message); } public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }

客户端的例子:

public class EchoClientHandler : ChannelHandlerAdapter { readonly IByteBuffer initialMessage; public EchoClientHandler() { this.initialMessage = Unpooled.Buffer(ClientSettings.Size); byte[] messageBytes = Encoding.UTF8.GetBytes("Hello world"); this.initialMessage.WriteBytes(messageBytes); } public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage); public override void ChannelRead(IChannelHandlerContext context, object message) { var byteBuffer = message as IByteBuffer; if (byteBuffer != null) { Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8)); } context.WriteAsync(message); } public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/209860.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 上午8:18
下一篇 2026年3月19日 上午8:18


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号