Java实现多线程多节点下载

  • Post author:
  • Post category:java


对于一个资源文件,如何使用多线程下载以提高下载效率?一个方法是把要下载的文件分成几块利用多个线程同时下载,保存在本地的时候则需要知道每个线程所下载的部分对应于整个文件的位置,可以使用RandAccessFile支持随机存取。

package com.idc.downloader;

import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class DownloaderTask implements Callable<Integer>{

    public static final int ERROR=-1;
    public static final int OK=1;
    //待下载资源的URL
    private URL url;
    //待下载资源链接
    private String urlStr;
    //本地保存路径
    private String localSavPath;
    //连接超时
    private int connectTimeOut=5000;
    //要下载的文件大小
    private int fileSize;
    //每个任务最多由多少个线程同时下载(貌似5个线程效果比较好)
    private int maxPiece=5;
    //每个线程最小应该下载多少
    private int minSizePerPiece=10*1024*1024;
    //任务实际由多少个线程下载
    private int pieces=0;
    private int sizePerPiece=0;
    private int lastPieceSize=0;

    //所有下载子任务共用一个线程池
    private static ExecutorService downloaderPool=Executors.newCachedThreadPool();

    public DownloaderTask(String url,String localSavePath){
        this.urlStr=url;
        this.localSavPath=localSavePath;
    }
    public static void shutdown(){
        downloaderPool.shutdown();  
    }
    //将待下载文件进行划分
    private void spilt(){
        if(fileSize<=minSizePerPiece){ //只需一个线程
            pieces=1;
            lastPieceSize=fileSize;
        }else{
            pieces=fileSize/minSizePerPiece+1;
            if(pieces>maxPiece) //超过了最大线程数量
                pieces=maxPiece;
            sizePerPiece=fileSize/pieces;
            lastPieceSize=fileSize-sizePerPiece*(pieces-1);         
        }
    }
    private int prepare(){
        URLConnection urlConnection=null;
        try {
            url=new URL(urlStr);
        } catch (MalformedURLException e) {
            e.printStackTrace();
            return ERROR;
        }
        try{
            urlConnection=url.openConnection();
            urlConnection.setConnectTimeout(connectTimeOut);
            urlConnection.connect();                
        }catch(Exception e){
            e.printStackTrace();
            return ERROR;
        }
        fileSize=urlConnection.getContentLength();  
        spilt();
        System.out.println("待下载文件大小为:"+fileSize+" 字节");
        return OK;
    }
    @Override
    public Integer call(){
        if(prepare()!=1)
            return ERROR;
        RandomAccessFile localFile;
        try {
            localFile = new RandomAccessFile(localSavPath, "rw");
            localFile.close();
        } catch (Exception e) {
            e.printStackTrace();
            return ERROR;
        }   
        Future<Integer>[] resultFutures=new Future[pieces];
        for(int i=0;i<pieces-1;i++){
            resultFutures[i]=downloaderPool.submit(new SubTask(i+1, sizePerPiece, sizePerPiece));
        }
        resultFutures[pieces-1]=downloaderPool.submit(new SubTask(pieces,sizePerPiece,lastPieceSize));
        for(int i=0;i<resultFutures.length;i++){
            int result=-1;
            try {
                result = resultFutures[i].get(); //获取每个子任务的执行结果
            } catch (InterruptedException e) {
                e.printStackTrace();
                return ERROR;
            } catch (ExecutionException e) {
                e.printStackTrace();
                return ERROR;
            }
            if(result!=1)
                return ERROR;
        }
        return OK;
    }
    //分片下载子任务
    private class SubTask implements Callable<Integer>{
        //标着是第几块
        private int mOrder=-1;
        private int mSize=0;
        private int mSizePerThread=0;
        public SubTask(int order,int sizePerThread,int size){
            mOrder=order;
            mSize=size;
            mSizePerThread=sizePerThread;
        }
        @Override
        public Integer call(){
            URLConnection connection;
            try {
                connection = url.openConnection();
                connection.setRequestProperty("RANGE","bytes="+mSizePerThread*(mOrder-1)+"-"+(mSizePerThread*(mOrder-1)+mSize));
                connection.setRequestProperty("Connection", "Keep-Alive");
                connection.connect();           
                InputStream inputStream=connection.getInputStream();
                byte[] buffer=new byte[2048];
                RandomAccessFile accessFile=new RandomAccessFile(localSavPath, "rw");
                accessFile.seek(mSizePerThread*(mOrder-1));
                int read=-1;
                while((read=inputStream.read(buffer, 0,buffer.length))>0){
                    accessFile.write(buffer,0,Math.min(mSize, read));
                    mSize-=read;
                    if(mSize<=0)
                        break;
                }
                accessFile.close();
                inputStream.close();
            }catch(Exception e){
                e.printStackTrace();
                return ERROR;
            }
            return OK;
        }   
    }
}
package com.idc.downloader;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

//下载管理
public class DownloaderManager {
    //下载线程池
    private ExecutorService servicePool;
    public DownloaderManager(){
        servicePool=Executors.newCachedThreadPool();
    }
    public void addTask(String urlString){
        servicePool.submit(new DownloaderTask(urlString, "d:\\abc.apk"));
    }
    public void close(){        
        servicePool.shutdown();
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        DownloaderTask.shutdown();
    }
    public static void main(String[] args){
        DownloaderManager manager=new DownloaderManager();
        manager.addTask("http://shouji.360tpcdn.com/151229/d8f144f0dbb63b62a99b848c02c3134e/com.qihoo360.mobilesafe_241.apk");
        manager.close();
    }
}

这里需要注意几点:

1. 下载文件分配的线程要适中,当文件比较小的时候,单线程就可以了,若文件很大,一般5个线程(网上说法)就差不多了。

2. 最后一个线程下载的文件块的大小需要单独计算

3. 可以在每个线程请求服务器数据的时候设置

range

,如代码中的

connection.setRequestProperty("RANGE","bytes="+mSizePerThread*(mOrder-1)+"-"+(mSizePerThread*(mOrder-1)+mSize));

,这样可以向服务器请求一定范围内的数据,注意其中的

"-"

不要落了

4. 当我们使用

read

方法,比如代码中的

(read=inputStream.read(buffer, 0,buffer.length))>0

需要注意


read

返回的数据大小不定,即使其有

buffer.length

大小的数据,也不能保证其一定就会返回这么大小的数据



版权声明:本文为zlp1992原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。