Netty实战
此处观看更加
使用Netty实现一个简单的RPC框架
RPC是什么,原理是什么网上很多大神都有在总结,我就不再重复。如果对RPC还不是很了解的同学不妨先去了解一下基本的概念
这个实例仅仅只是学习Netty的一个很小的样例,实际上,它离真正的RPC还差得远,写这个的目的仅仅只是为了熟悉Netty
一个真正的RPC至少应该具备以下的功能:
- 注册中心
- 网络传输
- 序列化和反序列化
- 动态代理
- 负载均衡
- 传输协议
需求:
- 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty
设计说明:
- 创建一个接口,定义抽象方法。用于消费者和提供者之间的约
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
1.定义Public接口
package cuit.epoch.pymjl.dubborpc.pulicinterface; / * @author Pymjl * @version 1.0 * @date 2022/4/13 15:07 / public interface HelloService {
/ * 简单的RPC接口 * @param msg 信息 * @return String */ String sayHello(String msg); }
2.定义服务提供者
2.1.服务端首先需要实现Public接口,以供消费者调用
package cuit.epoch.pymjl.dubborpc.provider; import cuit.epoch.pymjl.dubborpc.pulicinterface.HelloService; / * @author Pymjl * @version 1.0 * @date 2022/4/13 15:08 / public class HelloServiceImpl implements HelloService {
@Override public String sayHello(String msg) {
System.out.println("provider receive msg:" + msg); if (msg != null && !"".equals(msg)) {
return "Hello Client,I had already received your msg==>{" + msg + "}"; } else {
return "provider return msg: hello"; } } }
2.2.定义Netty服务
因为这只是一个很简单的样例,所以编解码器选择的Netty自带的字符串编解码器
然后我们需要自定义一个Handler负责处理业务
最后将编解码器和Handler一并加入Pipeline中
- NettyServer
package cuit.epoch.pymjl.dubborpc.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; / * @author Pymjl * @version 1.0 * @date 2022/4/13 15:13 / public class NettyServer {
/ * 对外暴露的方法 * * @param hostName 主机名 * @param port 端口号 */ public static void startServer(String hostName, int port) {
startServer0(hostName, port); } private static void startServer0(String hostName, int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync(); System.out.println("<===========服务器启动成功,端口号:" + port + "=============>"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
- 自定义NettyServerHandler,处理业务逻辑
package cuit.epoch.pymjl.dubborpc.netty; import cuit.epoch.pymjl.dubborpc.provider.HelloServiceImpl; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; / * @author Pymjl * @version 1.0 * @date 2022/4/13 15:20 / public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/ * 当有客户端连接进来的时候,调用此方法 * * @param ctx 当前连接的上下文 * @param msg 当前连接的消息 * @throws Exception 异常 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送的消息,并调用服务 System.out.println("服务端Handler收到客户端消息:{" + msg + "}"); //客户端在调用服务器的api时,需要遵守相应的协议 //协议:每次发的消息必须以"HelloService#hello#"开头 if (msg.toString().startsWith("HelloService#hello#")) {
//调用方法 String response = new HelloServiceImpl().sayHello(msg.toString(). substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(response); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); } }
2.3.定义服务提供者启动类
package cuit.epoch.pymjl.dubborpc.provider; import cuit.epoch.pymjl.dubborpc.netty.NettyServer; / * 启动一个服务提供者,NettyServer * * @author Pymjl * @version 1.0 * @date 2022/4/13 15:12 / public class ServerBootStrap {
public static void main(String[] args) {
NettyServer.startServer("127.0.0.1", 8080); } }
3.定义消费者
3.1.定义NettyClient
Client需要用到代理模式(动态代理),目的是用来屏蔽客户端调用服务端方法的底层细节
package cuit.epoch.pymjl.dubborpc.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; / * @author Pymjl * @version 1.0 * @date 2022/4/13 15:50 / public class NettyClient {
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler clientHandler; / * 代理模式 * @param serviceClass 接口类 * @param providerName 协议头 * @return */ public Object getBean(final Class<?> serviceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{
serviceClass}, (proxy, method, args) -> {
if (clientHandler == null) {
initClient(); } //设置要发给服务端的信息 clientHandler.setParam(providerName + args[0]); return executor.submit(clientHandler).get(); }); } / * 初始化客户端 */ private static void initClient() {
try {
clientHandler = new NettyClientHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(clientHandler); } }); bootstrap.connect("127.0.0.1", 8080).sync(); } catch (InterruptedException e) {
e.printStackTrace(); } } }
3.2.自定义NettyClientHandler
注意Handler还实现了Callable方法,call方法是被代理对象调用,发送数据给服务端,并等待服务端返回结果,所以这里要注意call方法和channelRead0方法的同步控制,当call方法发送数据之后,应当进入休眠,然后等待服务端返回结果。当服务端返回结果后,应当再唤醒call方法,完成调用
package cuit.epoch.pymjl.dubborpc.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.Callable; / * @author Pymjl * @version 1.0 * @date 2022/4/13 15:38 / public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable<String> {
/ * 上下文 */ private ChannelHandlerContext context; / * 返回的结果 */ private String result; / * 客户端调用服务时传入的方法参数 */ private String param; / * 被代理对象调用,发送数据给服务端,并等待服务端返回结果 * * @return result * @throws Exception 异常 */ @Override public synchronized String call() throws Exception {
/* 向服务端发送消息 */ context.writeAndFlush(param); //开始等待,直到服务端返回结果,被唤醒 wait(); return result; } / * 客户端连接成功后,向服务端发送消息 * * @param ctx 上下文 * @throws Exception 异常 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx; } / * 客户端接收服务端返回的消息 * * @param ctx 上下文 * @param msg 消息 * @throws Exception 异常 */ @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString(); //唤醒等待的线程 notify(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause); } public void setParam(String param) {
this.param = param; } }
3.3.定义客户端启动类
package cuit.epoch.pymjl.dubborpc.customer; import cuit.epoch.pymjl.dubborpc.netty.NettyClient; import cuit.epoch.pymjl.dubborpc.pulicinterface.HelloService; / * @author Pymjl * @version 1.0 * @date 2022/4/13 16:06 / public class ClientBootStrap {
public static final String DEFAULT_PROVIDER_NAME = "HelloService#hello#"; public static void main(String[] args) {
//创建一个消费者 NettyClient customer = new NettyClient(); //创建代理对象 HelloService helloService = (HelloService) customer.getBean(HelloService.class, DEFAULT_PROVIDER_NAME); //调用代理对象的方法 String res = helloService.sayHello("hello dubboService"); System.out.println("调用的结果==>" + res); } }
运行示例
服务端:

客户端:
}
}
运行示例 服务端:  客户端: OK,到这里简单的rpc调用就已经实现了。但是真正的RPC远不止这些,后面有时间我还会继续深入学习Netty以及Dubbo,通过自己diy一个RPC框架来学习这些框架的底层原理
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203128.html原文链接:https://javaforall.net
