NIO实现并发拆分

  • Post author:
  • Post category:其他




实践者




说明

本章节描述以NIO的方式实现并发拆分,而为什么使用NIO的方式,实际上是为了使读写效率更高以及性能更好,至于NIO与IO的区别,可以自己查阅书籍或资料,本文不概述,重点是NIO方式实现并发拆分。



前提

实际上之前我们使用的文件拆分都是基于单线程的方式执行,久而久之会出现一个问题,拆分需要一个一个来,并且上一个拆分好之后,下一个才可以接着拆分,如果要拆分10个要就意味着要等9次一个一个的拆分,以此类推,如果更多的话,效率想都不敢想,所以决定以并发的方式来实现该拆分。

实际上在明确需求之后,我们可以分为几个步骤:分析、设计、编码、测试(性能测试),以下图片就是我分析需求后,想出来的解决方案,其实只要有思路,编码什么的都不是问题,因为你会跟着思路将编码过程中遇到的问题全部解决。

在这里插入图片描述



Main方法方式

话不多说,直接上码。

  private final static String FILE_PATH = "D:\\ideaFile\\dubboAll\\splitFile"; //文件目录
  private final static String MERGE_FILE_PATH = "D:\\ideaFile\\dubboAll\\filesOld"; //整合目录
    /**
     * 创建目录
     */
    private void createFile() {
        File file = new File(FILE_PATH);
        if (!file.exists()) {
            file.mkdir();
        }
    }

    /**
     * 删除目录
     */
    private void delFile() {
        File file = new File(FILE_PATH);
        if (file.exists()) {
            File[] files = file.listFiles();
            for (int i = 0; i < files.length; i++) {
                files[i].delete();
            }
            file.delete();
        }
    }
    
	 /**
     * 计算文件大小以及计算出文件分割次数
     * <p>
     * size=拆分大小(字节)
     */
    public synchronized void fileSize(String fileName, int size) throws Exception {
        try {
            createFile(); //创建文件
            int thread = 4; //线程数
            File file = new File(fileName); //拆分文件
            long length = file.length(); //总文件大小--字节形式
            long count = length % size == 0 ? (length / size) : (length / size) + 1; //确保文件万无一失,获取循环总次数
            long threadCount = count % thread == 0 ? count / thread : count / thread + 1; //获取每个线程的循环次数
            System.out.println(length);
            System.out.println(count);
            System.out.println(threadCount);
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(thread,
                    10, 01, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>((int) count * 2)); //线程池
            //4个线程进行文件分割
            for (int i = 0; i < thread; i++) {
                threadPool.execute(new SplitCurrent(file, size, (int) threadCount * i, (int) ((i + 1) * threadCount)));
            }
            //等线程执行完成
            while (true) {
                Thread.sleep(500);
                //判断任务线程是否都执行完成
                if (threadPool.getCompletedTaskCount() >= 4) {
                    return;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        try {
            long l = System.currentTimeMillis();
            DemoFile demo = new DemoFile();
            String fileName = "WeChatSetup.exe";
            demo.fileSize("D:\\ideaFile\\dubboAll\\" + fileName, 10072025);//10072025
            demo.mergeFile(new File(FILE_PATH), fileName);
            System.out.println("消耗时间======" + (System.currentTimeMillis() - l));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //并发拆分
    class SplitCurrent implements Runnable {
        File file;      //文件对象
        long splitSize;  //分割大小
        long count;      //单个线程循环起始位置
        long local;      //单个线程循环结束位置

        public SplitCurrent(File file, long splitSize, long count, long local) {
            this.file = file;
            this.splitSize = splitSize;
            this.count = count;
            this.local = local;
        }

        public SplitCurrent() {
        }

        /**
         * 并发拆分
         */
        public void run() {
            int read = 0;
            long startPosition = 0;
            long endPosition = 0;
            try {
                FileInputStream fis = new FileInputStream(file);     //获取文件
                System.out.println(fis);
                System.out.println(file);
                FileChannel inputChannel = fis.getChannel();        //开启nio管道
                ByteBuffer byteBuffer = ByteBuffer.allocate((int) splitSize); // 申请一个缓存区
                startPosition = splitSize * count;                    //字节起始位置
                endPosition = splitSize * count == 0 ? splitSize : splitSize * (count + 1);        //字节结束位置--默认值
                //循环次数
                for (long i = count; i < local; i++) {
                    read = inputChannel.read(byteBuffer, endPosition);// 读取数据
                    //管道中数据不足时
                    if (read < 0) {
                        splitSize = endPosition - inputChannel.size();
                        endPosition = inputChannel.size();
                        //读取剩余的数据作为最后一个写入文件
                        read = inputChannel.read(byteBuffer, startPosition);// 读取数据
                    }
                    if (startPosition <= endPosition && (endPosition - startPosition) > 0) {
                        String partFileName = FILE_PATH + File.separator + file.getName() + (i + 1) + ".part";
                        byteBuffer.flip();//切换读
                        FileOutputStream fos = new FileOutputStream(partFileName);
                        FileChannel outputChannel = fos.getChannel();
                        //起始位置一直在变,读取长度也要一直改变
                        inputChannel.transferTo(startPosition, (endPosition - startPosition), outputChannel);//通道传输文件数据
                        outputChannel.close();
                        byteBuffer.clear(); //清空缓冲区
                        fos.close();
                        startPosition = endPosition; //下次读取的位置要迭代加
                        endPosition = splitSize * (i + 2);     //结束位置也要迭代加
                    }
                }
                inputChannel.close();
                fis.close();
            } catch (
                    Exception e) {
                System.out.println(Thread.currentThread().getName() + "\t" + endPosition + "\t" + startPosition + "\t" + read + "\t" + (endPosition - startPosition));
                // e.printStackTrace();
            }
        }
    }

    /**
     * 文件汇总
     *
     * @param dir
     * @param fileName
     * @throws Exception
     */
    private void mergeFile(File dir, String fileName) throws Exception {
        try {
            // 获取该目录下所有的碎片文件
            File[] partFiles = dir.listFiles(new FilenameFilter() {
                public boolean accept(File dir, String name) {
                    return name.endsWith(".part");
                }
            });
            // 将碎片文件存入到集合中
            List<FileInputStream> alLFileInput = new ArrayList<FileInputStream>();
            for (int i = 0; i < partFiles.length; i++) {
                try {
                    alLFileInput.add(new FileInputStream(dir.getPath() + File.separator + fileName + (i + 1) + ".part"));
                } catch (Exception e) {
                    // 异常
                    e.printStackTrace();
                }
            }
            // 构建文件流集合
            Enumeration<FileInputStream> en = Collections.enumeration(alLFileInput);
            // 将多个流合成序列流
            SequenceInputStream sis = new SequenceInputStream(en);
            FileOutputStream fos = new FileOutputStream(new File(MERGE_FILE_PATH, fileName));
            byte[] b = new byte[1024];
            int len = 0;
            while ((len = sis.read(b)) != -1) {
                fos.write(b, 0, len);
            }
            fos.close();
            sis.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //delFile();  //删除spliFile文件夹
        }
    }

在这里插入图片描述

在这里插入图片描述



Multipart方式

其实掌握Main方式的形式之后,改成流方式很简单。


html页面

<div>
    <div>
        <form action="/cloud2/upFile" name="upload" id="submitForm" enctype="multipart/form-data" method="post">
            <input type="file" value="" name="multipartFile" id="multipartFile">
            <input type="submit" value="提交" name="上传">
        </form>
        <span>当前支持格式为:jpg,png,css,js,mp4(30G以下随意玩)</span>
    </div>
</div>


UMConfig.java

(配置Multipart接收的文件大小)

@Configuration
public class UMConfig {
    @Bean
    public MultipartConfigElement multipartConfigElement() {
        MultipartConfigFactory factory = new MultipartConfigFactory();
        // 单个数据大小 30G
        factory.setMaxFileSize(DataSize.ofGigabytes(30));
        /// 总上传数据大小 30G
        factory.setMaxRequestSize(DataSize.ofGigabytes(30));
        return factory.createMultipartConfig();
    }
}


Controller.java

(控制类)

@RequestMapping(value = "/upFile")
    @ResponseBody
    public String addFile(@RequestParam(value = "multipartFile") MultipartFile multipartFile) {
        try {
            long l = System.currentTimeMillis();
            fileSize(8072025, multipartFile.getOriginalFilename(), multipartFile);
            mergeFile(new File(FILE_PATH), multipartFile.getOriginalFilename()); //合并文件
            System.out.println("全部流程需要花费=========" + (System.currentTimeMillis() - l));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //delFile(); //删除文件
        }
        return "OnFile";
    }
    /**
     * 创建目录
     */
    private void createFile() {
        File file = new File(FILE_PATH);
        if (!file.exists()) {
            file.mkdir();
        }
    }

    /**
     * 删除目录
     */
    private void delFile() {
        File file = new File(FILE_PATH);
        if (file.exists()) {
            File[] files = file.listFiles();
            for (int i = 0; i < files.length; i++) {
                files[i].delete();
            }
            file.delete();
        }
    }

    /**
     * 计算文件大小以及计算出文件分割次数
     * <p>
     * size=拆分大小(字节)
     */
    public void fileSize(int size, String fileName, MultipartFile multipartFile) throws Exception {
        try {
            createFile(); //创建文件
            int thread = 4; //线程数
            FileInputStream fileInputStream1 = (FileInputStream) multipartFile.getInputStream(); //临时
            long length = fileInputStream1.getChannel().size(); //总文件大小--字节形式
            long count = length % size == 0 ? (length / size) : (length / size) + 1; //确保文件万无一失,获取循环总次数
            long threadCount = count % thread == 0 ? count / thread : count / thread + 1; //获取每个线程的循环次数
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(thread,
                    10, 01, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>((int) count * 2)); //线程池
            //4个线程进行文件分割
            for (int i = 0; i < thread; i++) {
                threadPool.execute(new SplitCurrent((FileInputStream) multipartFile.getInputStream(), size, (int) threadCount * i, (int) ((i + 1) * threadCount), fileName));
            }
            //等线程执行完成
            while (true) {
                Thread.sleep(500);
                //判断任务线程是否都执行完成
                if (threadPool.getCompletedTaskCount() >= 4) {
                    return;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    //并发拆分
    class SplitCurrent implements Runnable {
        FileInputStream fileInputStream;      //文件流对象
        long splitSize;  //分割大小
        long count;      //单个线程循环起始位置
        long local;      //单个线程循环结束位置
        String fileName; //文件名称

        public SplitCurrent(FileInputStream fileInputStream, long splitSize, long count, long local, String fileName) {
            this.fileInputStream = fileInputStream;
            this.splitSize = splitSize;
            this.count = count;
            this.local = local;
            this.fileName = fileName;
        }

        public SplitCurrent() {
        }

        /**
         * 并发拆分
         */
        public void run() {
            int read = 0;
            long startPosition = 0;
            long endPosition = 0;
            try {
                System.out.println("线程里的对象====" + fileInputStream);
                FileChannel inputChannel = fileInputStream.getChannel();        //开启nio管道
                ByteBuffer byteBuffer = ByteBuffer.allocate((int) splitSize); // 申请一个缓存区
                startPosition = splitSize * count;                    //字节起始位置
                endPosition = splitSize * count == 0 ? splitSize : splitSize * (count + 1);        //字节结束位置--默认值
                //循环次数
                for (long i = count; i < local; i++) {
                    read = inputChannel.read(byteBuffer, endPosition);// 读取数据
                    //管道中数据不足时
                    if (read < 0) {
                        splitSize = endPosition - inputChannel.size();
                        endPosition = inputChannel.size();
                        //读取剩余的数据作为最后一个写入文件
                        read = inputChannel.read(byteBuffer, startPosition);// 读取数据
                    }
                    if (startPosition <= endPosition && (endPosition - startPosition) > 0) {
                        String partFileName = FILE_PATH + File.separator + fileName + (i + 1) + ".part";
                        byteBuffer.flip();//切换读
                        FileOutputStream fos = new FileOutputStream(partFileName);
                        FileChannel outputChannel = fos.getChannel();
                        //起始位置一直在变,读取长度也要一直改变
                        inputChannel.transferTo(startPosition, (endPosition - startPosition), outputChannel);//通道传输文件数据
                        byteBuffer.clear(); //清空缓冲区
                        outputChannel.close();
                        fos.close();
                        startPosition = endPosition; //下次读取的位置要迭代加
                        endPosition = splitSize * (i + 2);     //结束位置也要迭代加
                    }
                }
                inputChannel.close();
                fileInputStream.close();
            } catch (
                    Exception e) {
                System.out.println(Thread.currentThread().getName() + "\t" + endPosition + "\t" + startPosition + "\t" + read + "\t" + (endPosition - startPosition));
                //e.printStackTrace();
            }
        }
    }

    /**
     * 文件汇总
     *
     * @param dir
     * @param fileName
     * @throws Exception
     */
    private void mergeFile(File dir, String fileName) throws Exception {
        try {
            // 获取该目录下所有的碎片文件
            File[] partFiles = dir.listFiles(new FilenameFilter() {
                public boolean accept(File dir, String name) {
                    return name.endsWith(".part");
                }
            });
            // 将碎片文件存入到集合中
            List<FileInputStream> alLFileInput = new ArrayList<FileInputStream>();
            for (int i = 0; i < partFiles.length; i++) {
                try {
                    alLFileInput.add(new FileInputStream(dir.getPath() + File.separator + fileName + (i + 1) + ".part"));
                } catch (Exception e) {
                    // 异常
                    e.printStackTrace();
                }
            }
            // 构建文件流集合
            Enumeration<FileInputStream> en = Collections.enumeration(alLFileInput);
            // 将多个流合成序列流
            SequenceInputStream sis = new SequenceInputStream(en);
            FileOutputStream fos = new FileOutputStream(new File(MERGE_FILE_PATH, fileName));
            byte[] b = new byte[1024];
            int len = 0;
            while ((len = sis.read(b)) != -1) {
                fos.write(b, 0, len);
            }
            fos.close();
            sis.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            delFile();  //删除spliFile文件夹
        }
    }



总结

实际上拆分的重要几点:①起始位置②截止位置③合并文件的顺序,这三点理解之后,也就意味着把该实例拿下了。



版权声明:本文为qq_42618394原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。