Giter Club home page Giter Club logo

netty-file's Introduction

Netty 文件传输

2020-1-2更新

在之前的项目中介绍了

springboot整合 netty做心跳检测

springboot 整合netty编写时间服务器

这次通过 Netty 传递文件

此项目地址:   https://github.com/haoxiaoyong1014/netty-file

项目依赖

 <dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.21.Final</version>
 </dependency>

和之前的两个例子中的依赖是一样的

项目中的重要部分代码

  • 客户端

    • FileUploadClientHandler

public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
    private int byteRead;
    private volatile int start = 0;
    private volatile int lastLength = 0;
    public RandomAccessFile randomAccessFile;
    private FileUploadFile fileUploadFile;
    private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadClientHandler.class);
    
    public FileUploadClientHandler(FileUploadFile ef) {
        if (ef.getFile().exists()) {
            if (!ef.getFile().isFile()) {
                System.out.println("Not a file :" + ef.getFile());
                return;
            }
        }
        this.fileUploadFile = ef;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        super.channelInactive(ctx);
        LOGGER.info("客户端结束传递文件channelInactive()");
    }

    public void channelActive(ChannelHandlerContext ctx) {
        LOGGER.info("正在执行channelActive()方法.....");
        try {
            randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(),
                    "r");
            randomAccessFile.seek(fileUploadFile.getStarPos());
            // lastLength = (int) randomAccessFile.length() / 10;
            lastLength = 1024 * 10;
            byte[] bytes = new byte[lastLength];
            if ((byteRead = randomAccessFile.read(bytes)) != -1) {
                fileUploadFile.setEndPos(byteRead);
                fileUploadFile.setBytes(bytes);
                ctx.writeAndFlush(fileUploadFile);   //发送消息到服务端
            } else {
            }
            LOGGER.info("channelActive()文件已经读完 " + byteRead);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException i) {
            i.printStackTrace();
        }
        LOGGER.info("channelActive()方法执行结束");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        if (msg instanceof Integer) {
            start = (Integer) msg;
            if (start != -1) {
                randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
                randomAccessFile.seek(start); //将文件定位到start
                LOGGER.info("长度:" + (randomAccessFile.length() - start));
                int a = (int) (randomAccessFile.length() - start);
                int b = (int) (randomAccessFile.length() / 1024 * 2);
                if (a < lastLength) {
                    lastLength = a;
                }
                LOGGER.info("文件长度:" + (randomAccessFile.length()) + ",start:" + start + ",a:" + a + ",b:" + b + ",lastLength:" + lastLength);
                byte[] bytes = new byte[lastLength];
                LOGGER.info("bytes的长度是="+bytes.length);
                if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length() - start) > 0) {
                    LOGGER.info("byteRead = "  + byteRead);
                    fileUploadFile.setEndPos(byteRead);
                    fileUploadFile.setBytes(bytes);
                    try {
                        ctx.writeAndFlush(fileUploadFile);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    randomAccessFile.close();
                    ctx.close();
                    LOGGER.info("文件已经读完channelRead()--------" + byteRead);
                }
            }
        }
    }

}

这里使用了RandomAccessFile

对RandomAccessFile 做一个简单的介绍更有助于理解代码

RandomAccessFile特点

RandomAccessFile是java Io体系中功能最丰富的文件内容访问类。即可以读取文件内容,也可以向文件中写入内容。但是和其他输入/输入流不同的是,程序可以直接跳到文件的任意位置来读写数据。   因为RandomAccessFile可以自由访问文件的任意位置,所以如果我们希望只访问文件的部分内容,那就可以使用RandomAccessFile类。   与OutputStearm,Writer等输出流不同的是,RandomAccessFile类允许自由定位文件记录指针,所以RandomAccessFile可以不从文件开始的地方进行输出,所以RandomAccessFile可以向已存在的文件后追加内容。则应该使用RandomAccessFile。

RandomAccessFile类包含了一个记录指针,用以标识当前读写处的位置,当程序新创建一个RandomAccessFile对象时,该对象的文件记录指针位于文件头(也就是0处), 当读/写了n个字节后,文件记录指针将会向后移动n个字节。除此之外,RandomAccessFile可以自由的移动记录指针,即可以向前移动,也可以向后移动。 RandomAccessFile包含了以下两个方法来操作文件的记录指针.

long getFilePointer(); 返回文件记录指针的当前位置
void seek(long pos); 将文件记录指针定位到pos位置

RandomAccessFile即可以读文件,也可以写,所以它即包含了完全类似于InputStream的3个read()方法,其用法和InputStream的3个read()方法完全一样; 也包含了完全类似于OutputStream的3个write()方法,其用法和OutputStream的3个Writer()方法完全一样。 除此之外,RandomAccessFile还包含了一系类的readXXX()和writeXXX()方法来完成输入和输出。

  • RandomAccessFile有两个构造器,其实这两个构造器基本相同,只是指定文件的形式不同而已,一个使用String参数来指定文件名,一个使用File参数来指定文件本身。 除此之外,创建RandomAccessFile对象还需要指定一个mode参数。该参数指定RandomAccessFile的访问模式,有以下4个值:

  • “r” 以只读方式来打开指定文件夹。如果试图对该RandomAccessFile执行写入方法,都将抛出IOException异常。
  • “rw” 以读,写方式打开指定文件。如果该文件尚不存在,则试图创建该文件。
  • “rws” 以读,写方式打开指定文件。相对于”rw” 模式,还要求对文件内容或元数据的每个更新都同步写入到底层设备。
  • “rwd” 以读,写方式打开指定文件。相对于”rw” 模式,还要求对文件内容每个更新都同步写入到底层设备

下面对上面的FileUploadClientHandler类中的代码进行说明:

channelActive()方法中有这么一段代码:

ctx.writeAndFlush(fileUploadFile);

这段代码的意思是向服务端发送消息,消息内容就是FileUploadFile对象, FileUploadFile对象中包含 :文件,文件名,开始位置,文件字节数组,结尾位置

服务端代码:

  • FileUploadServerHandler
public class FileUploadServerHandler extends ChannelInboundHandlerAdapter {
	private int byteRead;
    private volatile int start = 0;
    private String file_dir = "/tmp";
    private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadServerHandler.class);
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	// TODO Auto-generated method stub
    	super.channelActive(ctx);
        LOGGER.info("服务端:channelActive()");
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	// TODO Auto-generated method stub
    	super.channelInactive(ctx);
        LOGGER.info("服务端:channelInactive()");
    	ctx.flush();
    	ctx.close();
    }
    
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        LOGGER.info("收到客户端发来的文件,正在处理....");
        if (msg instanceof FileUploadFile) {
            FileUploadFile ef = (FileUploadFile) msg;
            byte[] bytes = ef.getBytes();
            byteRead = ef.getEndPos();
            String md5 = ef.getFile_md5();//文件名
            String path = file_dir + File.separator + md5;
            File file = new File(path);
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");//r: 只读模式 rw:读写模式
            randomAccessFile.seek(start);//移动文件记录指针的位置,
            randomAccessFile.write(bytes);//调用了seek(start)方法,是指把文件的记录指针定位到start字节的位置。也就是说程序将从start字节开始写数据
            start = start + byteRead;
            if (byteRead > 0) {
                ctx.writeAndFlush(start);//向客户端发送消息
                randomAccessFile.close();
                if(byteRead!=1024 * 10){
                	Thread.sleep(1000);
                	channelInactive(ctx);
                }
            } else {
                ctx.close();
            }
            LOGGER.info("处理完毕,文件路径:"+path+","+byteRead);
        }
    }
}

channelRead()方法即是接收客户端的代码,客户端也有这个方法,客户端channelRead()方法主要是负责读文件,服务端主要是写文件.

注意: 在服务端FileUploadServerHandler这个类中我们要将file_dir的路径改为自己电脑上的路径,mac: /tmp; windows: F:

我们可以先运行服务端的ServerFileTest测试类,然后运行客户端的 ClientFileTest测试类进行

具体代码就不贴上去了,可以在这里进行下载这个案例 netty-file   如果对你有帮助还请给个Star哦

2020-1-2更新

新增多个文件同时异步上传功能

引入线程,在开发中如果有此需求尽量使用线程池;

更新的代码有:

public class FileUploadClient implements Runnable {

    private final static Logger LOGGER = LoggerFactory.getLogger(FileUploadClient.class);

    private int port;
    private String host;
    private FileUploadFile fileUploadFile;


    public FileUploadClient(int port, String host, FileUploadFile fileUploadFile) {
        this.port = port;
        this.host = host;
        this.fileUploadFile = fileUploadFile;
    }

    @Override
    public void run() {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    //是禁用nagle算法
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<Channel>() {

                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(new ObjectEncoder());
                            ch.pipeline().addLast(
                                    new ObjectDecoder(
                                            ClassResolvers
                                                    .weakCachingConcurrentResolver(null)));
                            ch.pipeline().addLast(
                                    new FileUploadClientHandler(
                                            fileUploadFile));
                        }
                    });

            ChannelFuture f = null;
            try {
                f = b.connect(host, port).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOGGER.info("FileUploadClient connect()结束");
        } finally {
            group.shutdownGracefully();
        }
    }
  }  

FileUploadClient类实现Runnable并重写run方法;

测试类:

public static void main(String[] args) {
        final int FILE_PORT = 9991;
        try {
            List<String> fileNameList = new ArrayList();
            fileNameList.add("/test-1.zip");
            fileNameList.add("/test-2.zip");

            for (String fileName : fileNameList) {
                FileUploadFile uploadFile = new FileUploadFile();
                File file = new File(fileName);
                String fileMd5 = file.getName();// 文件名
                uploadFile.setFile(file);
                uploadFile.setFile_md5(fileMd5);
                uploadFile.setStarPos(0);// 文件开始位置
                Thread thread = new Thread(new FileUploadClient(FILE_PORT, "127.0.0.1", uploadFile));
                thread.start();
                System.out.println(fileName + "开始传输。。。。。。");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

netty-file's People

Contributors

dependabot[bot] avatar haoxiaoyong1014 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

netty-file's Issues

文件大小不一致

图片为 53kb ,服务端下载之后为60kb,图片打不开,但是看到打印的大小数据又是一致的

长时间使用会有内存泄漏

FileUploadClientHandler extends ChannelInboundHandlerAdapter
应该改为
FileUploadClientHandler extends SimpleChannelInboundHandler
因为ChannelInboundHandlerAdapter 的channelRead 不会释放msg. 内存会越积越多
Server的代码也一样.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.