java protostuff 序列化_使用Protostuff序列化

java protostuff 序列化_使用Protostuff序列化序rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大…

大家好,又见面了,我是你们的朋友全栈君。

rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到protostuff,觉得找到了一直在找的这种序列化方式。

protostuff简介

protobuf的一个缺点是需要数据结构的预编译过程,首先要编写.proto格式的配置文件,再通过protobuf提供的工具生成各种语言响应的代码。由于java具有反射和动态代码生成的能力,这个预编译过程不是必须的,可以在代码执行时来实现。有protostuff已经实现了这个功能。

protostuff效率

Ser Time+Deser Time (ns)

42a70e1729f6d6898366eb860e872d90.png

Size, Compressed size [light] in bytes

f1c9959c16a7500a08a636e54def3ae5.png

使用

pom依赖

com.dyuproject.protostuff

protostuff-core

1.0.8

com.dyuproject.protostuff

protostuff-runtime

1.0.8

工具类

public class SerializationUtil {

private static Map, Schema>> cachedSchema = new ConcurrentHashMap, Schema>>();

private static Objenesis objenesis = new ObjenesisStd(true);

private static Schema getSchema(Class clazz) {

@SuppressWarnings(“unchecked”)

Schema schema = (Schema) cachedSchema.get(clazz);

if (schema == null) {

schema = RuntimeSchema.getSchema(clazz);

if (schema != null) {

cachedSchema.put(clazz, schema);

}

}

return schema;

}

/**

* 序列化

*

* @param obj

* @return

*/

public static byte[] serializer(T obj) {

@SuppressWarnings(“unchecked”)

Class clazz = (Class) obj.getClass();

LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

try {

Schema schema = getSchema(clazz);

return ProtostuffIOUtil.toByteArray(obj, schema, buffer);

} catch (Exception e) {

throw new IllegalStateException(e.getMessage(), e);

} finally {

buffer.clear();

}

}

/**

* 反序列化

*

* @param data

* @param clazz

* @return

*/

public static T deserializer(byte[] data, Class clazz) {

try {

T obj = objenesis.newInstance(clazz);

Schema schema = getSchema(clazz);

ProtostuffIOUtil.mergeFrom(data, obj, schema);

return obj;

} catch (Exception e) {

throw new IllegalStateException(e.getMessage(), e);

}

}

}

基于netty的rpc

NettyServer

public class NettyServer {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

private int ioThreadNum;

//内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值

private int backlog;

private int port;

private Channel channel;

private EventLoopGroup bossGroup;

private EventLoopGroup workerGroup;

public NettyServer(int ioThreadNum, int backlog, int port) {

this.ioThreadNum = ioThreadNum;

this.backlog = backlog;

this.port = port;

}

public void start() throws InterruptedException {

bossGroup = new NioEventLoopGroup();

workerGroup = new NioEventLoopGroup(this.ioThreadNum);

final Map demoService = new HashMap();

demoService.put(“com.codecraft.service.HelloService”, new HelloServiceImpl());

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, backlog)

//注意是childOption

.childOption(ChannelOption.SO_KEEPALIVE, true)

.childOption(ChannelOption.TCP_NODELAY, true)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline()

.addLast(new RpcDecoder(RpcRequest.class))

.addLast(new RpcEncoder(RpcResponse.class))

.addLast(new ServerRpcHandler(demoService));

}

});

channel = serverBootstrap.bind(“127.0.0.1”,port).sync().channel();

logger.info(“NettyRPC server listening on port “+ port + ” and ready for connections…”);

Runtime.getRuntime().addShutdownHook(new Thread(){

@Override

public void run(){

//do shutdown staff

}

});

}

public void stop() {

if (null == channel) {

throw new ServerStopException();

}

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

channel.closeFuture().syncUninterruptibly();

bossGroup = null;

workerGroup = null;

channel = null;

}

}

ServerRpcHandler

public class ServerRpcHandler extends SimpleChannelInboundHandler {

private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class);

private final Map serviceMapping;

public ServerRpcHandler(Map serviceMapping) {

this.serviceMapping = serviceMapping;

}

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {

RpcResponse response = new RpcResponse();

response.setTraceId(rpcRequest.getTraceId());

try {

logger.info(“server handle request:{}”,rpcRequest);

Object result = handle(rpcRequest);

response.setResult(result);

} catch (Throwable t) {

response.setError(t);

}

channelHandlerContext.writeAndFlush(response);

}

private Object handle(RpcRequest request) throws Throwable {

String className = request.getClassName();

Object serviceBean = serviceMapping.get(className);

Class> serviceClass = serviceBean.getClass();

String methodName = request.getMethodName();

Class>[] parameterTypes = request.getParameterTypes();

Object[] parameters = request.getParameters();

FastClass serviceFastClass = FastClass.create(serviceClass);

FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);

return serviceFastMethod.invoke(serviceBean, parameters);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

logger.error(cause.getMessage(), cause);

RpcResponse response = new RpcResponse();

if(cause instanceof ServerException){

response.setTraceId(((ServerException) cause).getTraceId());

}

response.setError(cause);

ctx.writeAndFlush(response);

}

}

NettyClient

public class NettyClient implements IClient {

private EventLoopGroup workerGroup;

private Channel channel;

private int workerGroupThreads;

private ClientRpcHandler clientRpcHandler;

private final Optional> NO_TIMEOUT = Optional.>absent();

public NettyClient(int workerGroupThreads) {

this.workerGroupThreads = workerGroupThreads;

}

public void connect(InetSocketAddress socketAddress) {

workerGroup = new NioEventLoopGroup(workerGroupThreads);

clientRpcHandler = new ClientRpcHandler();

Bootstrap bootstrap = new Bootstrap();

bootstrap

.group(workerGroup)

.channel(NioSocketChannel.class)

.option(ChannelOption.SO_KEEPALIVE, true)

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

.addLast(new RpcDecoder(RpcResponse.class))

.addLast(new RpcEncoder(RpcRequest.class))

.addLast(clientRpcHandler);

}

});

channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())

.syncUninterruptibly()

.channel();

}

public RpcResponse syncSend(RpcRequest request) throws InterruptedException {

System.out.println(“send request:”+request);

channel.writeAndFlush(request).sync();

return clientRpcHandler.send(request,NO_TIMEOUT);

}

public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException {

channel.writeAndFlush(request);

return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit)));

}

public InetSocketAddress getRemoteAddress() {

SocketAddress remoteAddress = channel.remoteAddress();

if (!(remoteAddress instanceof InetSocketAddress)) {

throw new RuntimeException(“Get remote address error, should be InetSocketAddress”);

}

return (InetSocketAddress) remoteAddress;

}

public void close() {

if (null == channel) {

throw new ClientCloseException();

}

workerGroup.shutdownGracefully();

channel.closeFuture().syncUninterruptibly();

workerGroup = null;

channel = null;

}

}

ClientRpcHandler

@ChannelHandler.Sharable

public class ClientRpcHandler extends SimpleChannelInboundHandler {

//用blocking queue主要是用阻塞的功能,省的自己加锁

private final ConcurrentHashMap> responseMap = new ConcurrentHashMap>();

//messageReceived

@Override

protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {

System.out.println(“receive response:”+rpcResponse);

BlockingQueue queue = responseMap.get(rpcResponse.getTraceId());

queue.add(rpcResponse);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

super.exceptionCaught(ctx, cause);

cause.printStackTrace();

}

public RpcResponse send(RpcRequest request,Optional> timeout) throws InterruptedException {

responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue(1));

RpcResponse response = null;

try {

BlockingQueue queue = responseMap.get(request.getTraceId());

if(timeout == null || !timeout.isPresent()){

response = queue.take();

}else{

response = queue.poll(timeout.get().getKey(),timeout.get().getValue());

}

} finally {

responseMap.remove(request.getTraceId());

}

return response;

}

}

decoder

public class RpcDecoder extends ByteToMessageDecoder {

private Class> genericClass;

public RpcDecoder(Class> genericClass) {

this.genericClass = genericClass;

}

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {

if (byteBuf.readableBytes() < 4) {

return;

}

byteBuf.markReaderIndex();

int dataLength = byteBuf.readInt();

if (dataLength < 0) {

channelHandlerContext.close();

}

if (byteBuf.readableBytes() < dataLength) {

byteBuf.resetReaderIndex();

}

byte[] data = new byte[dataLength];

byteBuf.readBytes(data);

Object obj = SerializationUtil.deserializer(data, genericClass);

list.add(obj);

}

}

encoder

public class RpcEncoder extends MessageToByteEncoder {

private Class> genericClass;

public RpcEncoder(Class> genericClass) {

this.genericClass = genericClass;

}

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {

if (genericClass.isInstance(obj)) {

byte[] data = SerializationUtil.serializer(obj);

byteBuf.writeInt(data.length);

byteBuf.writeBytes(data);

}

}

}

参考

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/137829.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • ssh远程连接失败_ssh connect to host port 22

    ssh远程连接失败_ssh connect to host port 22不少人在ssh连接远程机器时遇到过ssh_exchange_identification:Connectionclosedbyremotehost的问题,在网上找了一堆教程试了都不行,博主总结了常见的几种解决方法(以Ubuntu18.04为例)。可能原因1:没装openssh-server;解决方案:sudoaptinstallopenssh-server或者sudoapt-getinstallopenssh-server可能原因2:连接超过了MaxSession

    2022年8月30日
    4
  • 如何删除苹果X的通讯录_iis配置php

    如何删除苹果X的通讯录_iis配置php一、 隐藏server信息先下载urlrewrite并安装附:Urlrewrite工具下载https://www.iis.net/downloads/microsoft/url-rewrite在网站目录下web.config文件以下位置<system.webServer>里修改<rewrite><outboundRulesrewriteBeforeCache=”true”><rulename=”RemoveServ

    2022年9月29日
    4
  • JS prototype作用

    JS prototype作用 prototype可查看原型属性,还可对原型添加属性或方法functionCar(name){this.name=name;this.run=function(){console.log(this.height+’cm’+this.name+’isrun!’)…

    2022年7月22日
    7
  • Redis过期–淘汰机制的解析和内存占用过高的解决方案「建议收藏」

    Redis过期–淘汰机制的解析和内存占用过高的解决方案

    2022年2月11日
    46
  • ID卡(工卡)复制到手机NFC「建议收藏」

    ID卡(工卡)复制到手机NFC「建议收藏」1.很多单位的工卡都是ID卡,而读卡机,一般是多频的,支持多种卡,那么如何把ID卡写到手机NFC中,实现“忘带卡自由”呢?2.前提条件:用手机NFC,在单位的刷卡机上刷卡,提示“非法卡”。说明打卡机支持NFC,如果不支持,请略过本文3.在单位的打卡机上正常刷卡,屏幕上会提示卡号。或者有的工卡上带卡号,一般ID卡号是00开头或000开头的10位卡号。如果不是,说明不是ID卡,请略过本文。有的单位的人事系统里也会记录员工的物理ID卡号。或者用其他读卡器读ID卡,也能读到卡号。4.获取到ID物理卡号后

    2022年5月1日
    1.6K
  • CSV文件太大打不开进行分割、和打开乱码问题[通俗易懂]

    CSV文件太大打不开进行分割、和打开乱码问题[通俗易懂]CSV文件打开以及乱码问题今天要使用一个csv文件,但是有8个G,excel打不开,用Python的pandas也读不了,可能是我电脑配置太落后,也可能是数据实在太大了。解决办法:首先处理打不开的问题,我们可以把大的csv分割成若干小文件,使用文件分割器,按10000行一个文件分割,分割器在F:\新建文件夹\csv文件分割器\split.exe,稍等一段时间就行。我还试过另一个分割器,但是不行…

    2022年7月21日
    88

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号