实践者
Success
说明
本章节描述以NIO的方式实现并发拆分,而为什么使用NIO的方式,实际上是为了使读写效率更高以及性能更好,至于NIO与IO的区别,可以自己查阅书籍或资料,本文不概述,重点是NIO方式实现并发拆分。
前提
实际上之前我们使用的文件拆分都是基于单线程的方式执行,久而久之会出现一个问题,拆分需要一个一个来,并且上一个拆分好之后,下一个才可以接着拆分,如果要拆分10个要就意味着要等9次一个一个的拆分,以此类推,如果更多的话,效率想都不敢想,所以决定以并发的方式来实现该拆分。
实际上在明确需求之后,我们可以分为几个步骤:分析、设计、编码、测试(性能测试),以下图片就是我分析需求后,想出来的解决方案,其实只要有思路,编码什么的都不是问题,因为你会跟着思路将编码过程中遇到的问题全部解决。
Main方法方式
话不多说,直接上码。
private final static String FILE_PATH = "D:\\ideaFile\\dubboAll\\splitFile"; //文件目录
private final static String MERGE_FILE_PATH = "D:\\ideaFile\\dubboAll\\filesOld"; //整合目录
/**
* 创建目录
*/
private void createFile() {
File file = new File(FILE_PATH);
if (!file.exists()) {
file.mkdir();
}
}
/**
* 删除目录
*/
private void delFile() {
File file = new File(FILE_PATH);
if (file.exists()) {
File[] files = file.listFiles();
for (int i = 0; i < files.length; i++) {
files[i].delete();
}
file.delete();
}
}
/**
* 计算文件大小以及计算出文件分割次数
* <p>
* size=拆分大小(字节)
*/
public synchronized void fileSize(String fileName, int size) throws Exception {
try {
createFile(); //创建文件
int thread = 4; //线程数
File file = new File(fileName); //拆分文件
long length = file.length(); //总文件大小--字节形式
long count = length % size == 0 ? (length / size) : (length / size) + 1; //确保文件万无一失,获取循环总次数
long threadCount = count % thread == 0 ? count / thread : count / thread + 1; //获取每个线程的循环次数
System.out.println(length);
System.out.println(count);
System.out.println(threadCount);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(thread,
10, 01, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>((int) count * 2)); //线程池
//4个线程进行文件分割
for (int i = 0; i < thread; i++) {
threadPool.execute(new SplitCurrent(file, size, (int) threadCount * i, (int) ((i + 1) * threadCount)));
}
//等线程执行完成
while (true) {
Thread.sleep(500);
//判断任务线程是否都执行完成
if (threadPool.getCompletedTaskCount() >= 4) {
return;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
long l = System.currentTimeMillis();
DemoFile demo = new DemoFile();
String fileName = "WeChatSetup.exe";
demo.fileSize("D:\\ideaFile\\dubboAll\\" + fileName, 10072025);//10072025
demo.mergeFile(new File(FILE_PATH), fileName);
System.out.println("消耗时间======" + (System.currentTimeMillis() - l));
} catch (Exception e) {
e.printStackTrace();
}
}
//并发拆分
class SplitCurrent implements Runnable {
File file; //文件对象
long splitSize; //分割大小
long count; //单个线程循环起始位置
long local; //单个线程循环结束位置
public SplitCurrent(File file, long splitSize, long count, long local) {
this.file = file;
this.splitSize = splitSize;
this.count = count;
this.local = local;
}
public SplitCurrent() {
}
/**
* 并发拆分
*/
public void run() {
int read = 0;
long startPosition = 0;
long endPosition = 0;
try {
FileInputStream fis = new FileInputStream(file); //获取文件
System.out.println(fis);
System.out.println(file);
FileChannel inputChannel = fis.getChannel(); //开启nio管道
ByteBuffer byteBuffer = ByteBuffer.allocate((int) splitSize); // 申请一个缓存区
startPosition = splitSize * count; //字节起始位置
endPosition = splitSize * count == 0 ? splitSize : splitSize * (count + 1); //字节结束位置--默认值
//循环次数
for (long i = count; i < local; i++) {
read = inputChannel.read(byteBuffer, endPosition);// 读取数据
//管道中数据不足时
if (read < 0) {
splitSize = endPosition - inputChannel.size();
endPosition = inputChannel.size();
//读取剩余的数据作为最后一个写入文件
read = inputChannel.read(byteBuffer, startPosition);// 读取数据
}
if (startPosition <= endPosition && (endPosition - startPosition) > 0) {
String partFileName = FILE_PATH + File.separator + file.getName() + (i + 1) + ".part";
byteBuffer.flip();//切换读
FileOutputStream fos = new FileOutputStream(partFileName);
FileChannel outputChannel = fos.getChannel();
//起始位置一直在变,读取长度也要一直改变
inputChannel.transferTo(startPosition, (endPosition - startPosition), outputChannel);//通道传输文件数据
outputChannel.close();
byteBuffer.clear(); //清空缓冲区
fos.close();
startPosition = endPosition; //下次读取的位置要迭代加
endPosition = splitSize * (i + 2); //结束位置也要迭代加
}
}
inputChannel.close();
fis.close();
} catch (
Exception e) {
System.out.println(Thread.currentThread().getName() + "\t" + endPosition + "\t" + startPosition + "\t" + read + "\t" + (endPosition - startPosition));
// e.printStackTrace();
}
}
}
/**
* 文件汇总
*
* @param dir
* @param fileName
* @throws Exception
*/
private void mergeFile(File dir, String fileName) throws Exception {
try {
// 获取该目录下所有的碎片文件
File[] partFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.endsWith(".part");
}
});
// 将碎片文件存入到集合中
List<FileInputStream> alLFileInput = new ArrayList<FileInputStream>();
for (int i = 0; i < partFiles.length; i++) {
try {
alLFileInput.add(new FileInputStream(dir.getPath() + File.separator + fileName + (i + 1) + ".part"));
} catch (Exception e) {
// 异常
e.printStackTrace();
}
}
// 构建文件流集合
Enumeration<FileInputStream> en = Collections.enumeration(alLFileInput);
// 将多个流合成序列流
SequenceInputStream sis = new SequenceInputStream(en);
FileOutputStream fos = new FileOutputStream(new File(MERGE_FILE_PATH, fileName));
byte[] b = new byte[1024];
int len = 0;
while ((len = sis.read(b)) != -1) {
fos.write(b, 0, len);
}
fos.close();
sis.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
//delFile(); //删除spliFile文件夹
}
}
Multipart方式
其实掌握Main方式的形式之后,改成流方式很简单。
html页面
<div>
<div>
<form action="/cloud2/upFile" name="upload" id="submitForm" enctype="multipart/form-data" method="post">
<input type="file" value="" name="multipartFile" id="multipartFile">
<input type="submit" value="提交" name="上传">
</form>
<span>当前支持格式为:jpg,png,css,js,mp4(30G以下随意玩)</span>
</div>
</div>
UMConfig.java
(配置Multipart接收的文件大小)
@Configuration
public class UMConfig {
@Bean
public MultipartConfigElement multipartConfigElement() {
MultipartConfigFactory factory = new MultipartConfigFactory();
// 单个数据大小 30G
factory.setMaxFileSize(DataSize.ofGigabytes(30));
/// 总上传数据大小 30G
factory.setMaxRequestSize(DataSize.ofGigabytes(30));
return factory.createMultipartConfig();
}
}
Controller.java
(控制类)
@RequestMapping(value = "/upFile")
@ResponseBody
public String addFile(@RequestParam(value = "multipartFile") MultipartFile multipartFile) {
try {
long l = System.currentTimeMillis();
fileSize(8072025, multipartFile.getOriginalFilename(), multipartFile);
mergeFile(new File(FILE_PATH), multipartFile.getOriginalFilename()); //合并文件
System.out.println("全部流程需要花费=========" + (System.currentTimeMillis() - l));
} catch (Exception e) {
e.printStackTrace();
} finally {
//delFile(); //删除文件
}
return "OnFile";
}
/**
* 创建目录
*/
private void createFile() {
File file = new File(FILE_PATH);
if (!file.exists()) {
file.mkdir();
}
}
/**
* 删除目录
*/
private void delFile() {
File file = new File(FILE_PATH);
if (file.exists()) {
File[] files = file.listFiles();
for (int i = 0; i < files.length; i++) {
files[i].delete();
}
file.delete();
}
}
/**
* 计算文件大小以及计算出文件分割次数
* <p>
* size=拆分大小(字节)
*/
public void fileSize(int size, String fileName, MultipartFile multipartFile) throws Exception {
try {
createFile(); //创建文件
int thread = 4; //线程数
FileInputStream fileInputStream1 = (FileInputStream) multipartFile.getInputStream(); //临时
long length = fileInputStream1.getChannel().size(); //总文件大小--字节形式
long count = length % size == 0 ? (length / size) : (length / size) + 1; //确保文件万无一失,获取循环总次数
long threadCount = count % thread == 0 ? count / thread : count / thread + 1; //获取每个线程的循环次数
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(thread,
10, 01, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>((int) count * 2)); //线程池
//4个线程进行文件分割
for (int i = 0; i < thread; i++) {
threadPool.execute(new SplitCurrent((FileInputStream) multipartFile.getInputStream(), size, (int) threadCount * i, (int) ((i + 1) * threadCount), fileName));
}
//等线程执行完成
while (true) {
Thread.sleep(500);
//判断任务线程是否都执行完成
if (threadPool.getCompletedTaskCount() >= 4) {
return;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//并发拆分
class SplitCurrent implements Runnable {
FileInputStream fileInputStream; //文件流对象
long splitSize; //分割大小
long count; //单个线程循环起始位置
long local; //单个线程循环结束位置
String fileName; //文件名称
public SplitCurrent(FileInputStream fileInputStream, long splitSize, long count, long local, String fileName) {
this.fileInputStream = fileInputStream;
this.splitSize = splitSize;
this.count = count;
this.local = local;
this.fileName = fileName;
}
public SplitCurrent() {
}
/**
* 并发拆分
*/
public void run() {
int read = 0;
long startPosition = 0;
long endPosition = 0;
try {
System.out.println("线程里的对象====" + fileInputStream);
FileChannel inputChannel = fileInputStream.getChannel(); //开启nio管道
ByteBuffer byteBuffer = ByteBuffer.allocate((int) splitSize); // 申请一个缓存区
startPosition = splitSize * count; //字节起始位置
endPosition = splitSize * count == 0 ? splitSize : splitSize * (count + 1); //字节结束位置--默认值
//循环次数
for (long i = count; i < local; i++) {
read = inputChannel.read(byteBuffer, endPosition);// 读取数据
//管道中数据不足时
if (read < 0) {
splitSize = endPosition - inputChannel.size();
endPosition = inputChannel.size();
//读取剩余的数据作为最后一个写入文件
read = inputChannel.read(byteBuffer, startPosition);// 读取数据
}
if (startPosition <= endPosition && (endPosition - startPosition) > 0) {
String partFileName = FILE_PATH + File.separator + fileName + (i + 1) + ".part";
byteBuffer.flip();//切换读
FileOutputStream fos = new FileOutputStream(partFileName);
FileChannel outputChannel = fos.getChannel();
//起始位置一直在变,读取长度也要一直改变
inputChannel.transferTo(startPosition, (endPosition - startPosition), outputChannel);//通道传输文件数据
byteBuffer.clear(); //清空缓冲区
outputChannel.close();
fos.close();
startPosition = endPosition; //下次读取的位置要迭代加
endPosition = splitSize * (i + 2); //结束位置也要迭代加
}
}
inputChannel.close();
fileInputStream.close();
} catch (
Exception e) {
System.out.println(Thread.currentThread().getName() + "\t" + endPosition + "\t" + startPosition + "\t" + read + "\t" + (endPosition - startPosition));
//e.printStackTrace();
}
}
}
/**
* 文件汇总
*
* @param dir
* @param fileName
* @throws Exception
*/
private void mergeFile(File dir, String fileName) throws Exception {
try {
// 获取该目录下所有的碎片文件
File[] partFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.endsWith(".part");
}
});
// 将碎片文件存入到集合中
List<FileInputStream> alLFileInput = new ArrayList<FileInputStream>();
for (int i = 0; i < partFiles.length; i++) {
try {
alLFileInput.add(new FileInputStream(dir.getPath() + File.separator + fileName + (i + 1) + ".part"));
} catch (Exception e) {
// 异常
e.printStackTrace();
}
}
// 构建文件流集合
Enumeration<FileInputStream> en = Collections.enumeration(alLFileInput);
// 将多个流合成序列流
SequenceInputStream sis = new SequenceInputStream(en);
FileOutputStream fos = new FileOutputStream(new File(MERGE_FILE_PATH, fileName));
byte[] b = new byte[1024];
int len = 0;
while ((len = sis.read(b)) != -1) {
fos.write(b, 0, len);
}
fos.close();
sis.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
delFile(); //删除spliFile文件夹
}
}
总结
实际上拆分的重要几点:①起始位置②截止位置③合并文件的顺序,这三点理解之后,也就意味着把该实例拿下了。
版权声明:本文为qq_42618394原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。