Netty 大文件传输
使用netty进行大文件传输,对文件大小没有限制 实际上,传输的是文件分片,分片大小自定义
整体思路
- 客户端一连上服务器,即发送指定文件的文件分片
- 服务器收到分片后,写入指定路径,并向客户端返回所收到字节数(即通知客户端下次从文件的此字节开始传输)
- 客户端收到服务器的回应后,从指定字节开始创建下一个文件分片 继续发送给服务器,若已到文件末尾,则发送-1表示文件传输结束
- 服务器循环步骤2,直至收到客户端发来的-1,也回应给客户端-1表示收到传输结束的通知
- 客户端在收到-1后,断开与服务器的连接
代码实现
- 完整代码
github:https://github.com/StanAugust/NettyFileTransfer/tree/master
- 主要代码
- 客户端
/ * @ClassName: ClientHandler * @Description: 客户端的处理器,在在client.ClientInitializer initChannel中被调用 * @author Stan * @date: 2020年3月24日 */ public class ClientHandler extends ChannelInboundHandlerAdapter{
private static final Logger logger = Logger.getLogger(ClientHandler.class.getName()); private int byteRead; //一次读取的字节 private volatile int start = 0; //文件当前读取位置 private volatile int lastLength = 0; //单次文件传输剩余长度 private RandomAccessFile file; private FileConfig fc; public ClientHandler(FileConfig fc) {
if(fc.getFile().exists()) {
if(!fc.getFile().isFile()) {
logger.info("error:" + fc.getFile() + "is not a file!"); return; } } this.fc = fc; } / * @Description: * 连接一激活就向服务器发送文件,发送文件的函数在这里修改 * 若文件可一次传输完,则只调用本方法,否则调用一次本方法后,其余在channelRead中传输 * * @param ctx * @throws Exception * @see io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext) */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception{
file = new RandomAccessFile(fc.getFile(), "r"); send0(ctx, start); } / * @Description: 服务器通知已接受到的字节数,客户端收到通知并传输剩余文件 * @param ctx * @param msg * @throws Exception * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//收到服务器发来的字节数 if(msg instanceof Integer) {
start = (Integer)msg; if(start != -1) {
//文件没有传完 logger.info("服务器已收到字节数:" + start); send0(ctx, start); }else {
file.close(); logger.info("服务器已接收全部文件"); // 服务器接收完文件就与客户端断开连接 ctx.close(); } } } / * @throws Exception * @Description: 具体处理发送 */ private void send0(ChannelHandlerContext ctx, int start) throws Exception {
// 文件没有传完 if (start != -1) {
file.seek(start); // 把文件的记录指针定位到start字节的位置。也就是说本次将从start字节开始读数据 int nowLength = (int) (file.length() - start); // 文件当前总剩余长度 int transferLength = FileConfig.getTransferLength(); // 自定义的单次传输长度 lastLength = nowLength<transferLength ? nowLength:transferLength; // 选取较短一方作为单次文件传输剩余长度 transfer(ctx, lastLength); } } / * @Description: 完成单次传输 * @param ctx * @param length 单次传输长度 * @throws Exception */ private void transfer(ChannelHandlerContext ctx, int length) throws Exception {
byte[] buf = new byte[length]; if ((byteRead = file.read(buf)) != -1 && length > 0) {
fc.setEndPos(byteRead); fc.setFileBuf(buf); } else {
fc.setEndPos(-1); //结束位置-1,表示文件传输结束 fc.setFileBuf(null); logger.info("文件已上传完毕"); } ctx.writeAndFlush(fc); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); ctx.close(); } }
- 服务器
/ * @ClassName: ServerHandler * @Description: 服务器端的处理器,在server.ServerInitializer initChannel中调用 * @author Stan * @date: 2020年3月24日 */ public class ServerHandler extends ChannelInboundHandlerAdapter{
private static final Logger logger = Logger.getLogger(ServerHandler.class.getName()); private int byteRead; private int start = 0; / * @Description: 服务器接收到消息后进入这个方法,接收文件的函数在这里修改 * @param ctx * @param msg * @throws Exception * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果传递过来的是文件或文件分片 if(msg instanceof FileConfig) {
FileConfig fc = (FileConfig)msg; byte[] fileBuf = fc.getFileBuf(); //接收到的文件字节数组 byteRead = fc.getEndPos(); //记录当前文件传输结束的位置 if(byteRead == -1) {
// 约定的结束的标志 fileEnd(ctx); }else {
if(byteRead > 0) {
// TODO 文件接收路径需要指定 RandomAccessFile file = new RandomAccessFile(new File("test2.txt"), "rw"); file.seek(start); //把文件的记录指针定位到start字节的位置。也就是说程序本次将从start字节开始写数据 file.write(fileBuf);//把传输过来的数据写进文件里 start += byteRead; //确保文件下次能从当前结束的地方继续读取 ctx.writeAndFlush(start); //向客户端通知下次从第start字节开始传输 file.close(); logger.info("服务器已接收字节数:" + start + ",客户端地址:" + ctx.channel().remoteAddress()); }else {
exceptionCaught(ctx, new Throwable("可读字节小于0")); } } } } / * @Description: 文件接收完毕 * @param ctx */ private void fileEnd(ChannelHandlerContext ctx) {
ctx.writeAndFlush(-1); logger.info("服务器接收文件完毕" + "\n文件来源:"+ ctx.channel().remoteAddress() + "\n文件大小:" + start + " 字节"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close(); } }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/216497.html原文链接:https://javaforall.net
