基于Netty实时通信的简单案例实现
1 案例说明
多个客户端与服务器端建立websocket连接,服务器端将收到的客户端消息发送给所有与服务器端建立连接的客户端,相当于直播间的群聊功能。
当一个客户端发送消息时,所有客户端都将收到消息。并且保证实时性。
2 服务器端搭建
建立maven项目,引入Netty依赖:
io.netty
netty-all
4.1.42.Final
编写服务器端
package maolaoke.top.netty.webSocket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class WebSocketServer {
public static void main(String[] args) throws InterruptedException {
//定义两个线程组,采用reactor主从模式,bossGroup负责处理连接请求,WorkerGroup负责处理业务请求。 //默认是cpu核心数*2个线程去处理 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); //websocket基于http协议,添加http的编解码器 pipeline.addLast(new HttpServerCodec()); //对http中大数据的数据流,提供写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //对httpMessage进行聚合处理,聚合成FullHttpRequest或FullHttpResponse pipeline.addLast(new HttpObjectAggregator(1024*64)); /* * 处理一些繁重复杂的事情; * 处理websocket的握手动作:handshaking(close/ping/pong) ping+pong=心跳 从Http协议升级到Websocket协议,是通过StatusCode 101(Switching Protocols)来切换的。 * 对于websocket来说,都是以frames进行传输,不同的数据类型frames不同 * 并且要指定与客户端交互的路径:ws://localhost:8088/chat */ pipeline.addLast(new WebSocketServerProtocolHandler("/chat")); //添加自定义的数据处理器 pipeline.addLast(new ChatHandler()); } }); ChannelFuture future = serverBootstrap.bind(8088).sync(); future.channel().closeFuture().sync(); }finally {
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
编写ChatHandler自定义处理器
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; / * 用于处理消息的handler * 由于它的传输数据载体是frame,这个frame在netty中,是用于websocket专门处理文本对象的 * netty中用TextWebSocketFrame对象来封装frame */ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 定义一个channel组,管理所有的channel。 // GlobalEventExecutor.INSTANCE:全局事件执行器。单例。 // 用于记录和管理所有的客户端的channel private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//获取客户端传输的数据 String context = msg.text(); System.out.println("接受到的数据:" + context); //向所有客户端发送接受到的消息 clients.writeAndFlush(new TextWebSocketFrame("【" + ctx.channel().remoteAddress().toString() + "】: " + context)); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//当有连接建立时就将channel添加到全局变量clients中。 clients.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//连接断开时不用去手动删除clients中的channel,DefaultChannelGroup会帮我们做 System.out.println("客户端断开,channel对应的长ID:" + ctx.channel().id().asLongText()); } }
3 前端代码
写一个HTML页面简单实现发送和接受功能。
CHAT.socket = new WebSocket("ws://127.0.0.1:8088/chat");创建websocket对象并建立连接。
WebSocket的生命周期:
socket.onopen:连接建立时触发socket.onmessage:接受到消息时触发onerror:有异常时触发close:连接关闭时触发
<html> <head> <meta charset="utf-8"> <title>Netty通信
title>
head> <body> 发送消息:<input type="text" id="msgContent"/> <input type="button" value="发送消息" onclick="CHAT.chat()" /> <hr /> 接受消息: <div id="receiveMsg">
div> <script type="text/javascript"> window.CHAT = {
socket:null, //初始化socket init:function(){
//判断浏览器是否支持websocket if(window.WebSocket){
//创建websocket对象 CHAT.socket = new WebSocket("ws://127.0.0.1:8088/chat"); CHAT.socket.onopen = function(){
console.log("连接建立成功"); }, CHAT.socket.close = function(){
console.log("连接关闭"); }, CHAT.socket.onerror = function(){
console.log("发生异常"); }, CHAT.socket.onmessage = function(e){
console.log("接受消息:"+e.data); var receiveMsg = document.getElementById("receiveMsg"); var html = receiveMsg.innerHTML; //获取本对象原有的内容 //嵌入新的内容 receiveMsg.innerHTML = html + "
" + e.data; } }else{
console.log("浏览器不支持websocket协议"); } }, //发送消息时触发的函数 chat:function(){
//获取消息框中所输入的内容 var msgContent = document.getElementById("msgContent").value; //将客户端输入的消息进行发送 CHAT.socket.send(msgContent); } }; CHAT.init();
script>
body>
html>
测试:可以开多个窗口,每个窗口代表一个连接一个客户端。一个客户端发送消息时,其他客户端会瞬间收到消息。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/177609.html原文链接:https://javaforall.net
