来段例子大概的做一下概念的了解
server端:
static async Task RunServerAsync() { //重要的概念 IEventLoopGroup 代表了处理网络的线程,server端用两个,client端一个就够了。boss处理网络连接的概念,client处理数据概念。两个IEventLoopGroup不代表两个线程,而是代表了两组线程 IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1); IEventLoopGroup workerGroup= new MultithreadEventLoopGroup(); //如果通信需要加密的话,使用证书 X509Certificate2 tlsCertificate = null; if (ServerSettings.IsSsl) { tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password"); } try { var bootstrap = new ServerBootstrap(); bootstrap.Group(bossGroup, workerGroup); bootstrap.Channel
()
; bootstrap
.Option(ChannelOption
.SoBacklog,
100)
.Handler(new LoggingHandler(
"SRV-LSTN"))
.ChildHandler(new ActionChannelInitializer
(channel => { //重要概念 增加Handler和Decoder,Encoder到pipeline中去 IChannelPipeline pipeline = channel
.Pipeline
; if (tlsCertificate != null) { pipeline
.AddLast(
"tls", TlsHandler
.Server(tlsCertificate))
; } pipeline
.AddLast(new LoggingHandler(
"SRV-CONN"))
; pipeline
.AddLast(
"framing-enc", new LengthFieldPrepender(
2))
; pipeline
.AddLast(
"framing-dec", new LengthFieldBasedFrameDecoder(ushort
.MaxValue,
0,
2,
0,
2))
; //自定义的handler pipeline
.AddLast(
"echo", new EchoServerHandler())
; }))
; IChannel boundChannel = await bootstrap
.BindAsync(ServerSettings
.Port)
; Console
.ReadLine()
; await boundChannel
.CloseAsync()
; } finally { await Task
.WhenAll( bossGroup
.ShutdownGracefullyAsync(TimeSpan
.FromMilliseconds(
100), TimeSpan
.FromSeconds(
1)), workerGroup
.ShutdownGracefullyAsync(TimeSpan
.FromMilliseconds(
100), TimeSpan
.FromSeconds(
1)))
; } }
客户端:
static async Task RunClientAsync() { var group = new MultithreadEventLoopGroup(); X509Certificate2 cert = null; string targetHost = null; if (ClientSettings.IsSsl) { cert = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password"); targetHost = cert.GetNameInfo(X509NameType.DnsName, false); } try { var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel
() .Option(ChannelOption.TcpNodelay,
true) .Handler(
new ActionChannelInitializer
(channel
=> { IChannelPipeline pipeline = channel.Pipeline;
if (cert !=
null) { pipeline.AddLast
("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost))); } pipeline.AddLast(new LoggingHandler()); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); pipeline.AddLast("echo", new EchoClientHandler()); })); IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port)); Console.ReadLine(); await clientChannel.CloseAsync(); } finally { await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)); } }
Server端的Handler的定义:
public class EchoServerHandler : ChannelHandlerAdapter { public override void ChannelRead(IChannelHandlerContext context, object message) { var buffer = message as IByteBuffer; if (buffer != null) { Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8)); } context.WriteAsync(message); } public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }
客户端的例子:
public class EchoClientHandler : ChannelHandlerAdapter { readonly IByteBuffer initialMessage; public EchoClientHandler() { this.initialMessage = Unpooled.Buffer(ClientSettings.Size); byte[] messageBytes = Encoding.UTF8.GetBytes("Hello world"); this.initialMessage.WriteBytes(messageBytes); } public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage); public override void ChannelRead(IChannelHandlerContext context, object message) { var byteBuffer = message as IByteBuffer; if (byteBuffer != null) { Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8)); } context.WriteAsync(message); } public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/209860.html原文链接:https://javaforall.net
