java多线程文件下载器的实现
作者:是谢添啊
1.简介
该项目应用的知识点包括:
- RandomAccessFile 类的运用
- HttpURLConnection 类的运用
- 线程池的使用
- 原子类 LongAdder 的运用
- CountDownLatch 类的运用
- ScheduledExecutorService 类的运用
2.文件下载的核心
从互联网下载文件有点类似于我们将本地某个文件复制到另一个目录下,也会利用 IO 流进行操作。对于从互联网下载,还需要将本地和下载文件所在的服务器建立连接。
3.文件下载器的基础代码
3.1 HttpURLConnection
从互联网中下载文件的话,需要与文件所在的服务器建立连接,这里可以使用 jdk 提供的 java.net.HttpURLConnection 类来帮助我们完成这个操作。jdk11中有提供 java.net.http.HttpClient 类来替代 HttpURLConnection,由于现在使用的是 jdk8,因此先不用 jdk11 中的 HttpClient。除此之外还有一些其他第三方提供类可以执行类似的操作,这里就不赘述了。
3.2 用户标识
我们通过浏览器访问某个网站的时候,会将当前浏览器的版本,操作系统版本等信息的标识发送到网站所在的服务器中。当用程序代码去访问网站时,需要将这个标识发送过去。
Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1
4.下载信息
4.1 计划任务
文件下载的时候最好能够展示出下载的速度,已下载文件大小等信息。这里可以每隔一段时间来获取文件的下载信息,比如间隔 1 秒获取一次,然后将信息打印到控制台。文件下载是一个独立的线程,另外还需要再开启一个线程来间隔获取文件的信息。java.util.concurrent.ScheduledExecutorService 这个类可以帮助我们来实现此功能。
4.2 ScheduledExecutorService
在该类中提供了一些方法可以帮助开发者实现间隔执行的效果,下面列出一些常见的方法及其参数说明。我们可以通过下面方式来获取该类的对象,其中 1 标识核心线程的数量。
ScheduledExecutorService s = Executors.newScheduledThreadPool(1);
🍀 schedule方法
该方法是重载的,这两个重载的方法都是有 3 个形参,只是第一个形参不同。
参数 | 含义 |
---|---|
Runnable / Callable<V> | 可以传入这两个类型的任务 |
long delay | 延时的时间数量 |
TimeUnit unit | 时间单位 |
该方法的作用是让任务按照指定的时间延时执行。
🍀 scheduleAtFixedRate方法
该方法的作用是按照指定的时间延时执行,并且每隔一段时间再继续执行。
参数 | 含义 |
---|---|
Runnable command | 执行的任务 |
long initialDelay | 延时的时间数量 |
long period | 间隔的时间数量 |
TimeUnit unit | 时间单位 |
倘若在执行任务的时候,耗时超过了间隔时间,则任务执行结束之后直接再次执行,而不是再等待间隔时间执行。
🍀 scheduleWithFixedDelay方法
该方法的作用是按照指定的时间延时执行,并且每隔一段时间再继续执行。
参数 | 含义 |
---|---|
Runnable command | 执行的任务 |
long initialDelay | 延时的时间数量 |
long period | 间隔的时间数量 |
TimeUnit unit | 时间单位 |
在执行任务的时候,无论耗时多久,任务执行结束之后都会等待间隔时间之后再继续下次任务。
5.线程池简介
线程在创建,销毁的过程中会消耗一些资源,为了节省这些开销,jdk 添加了线程池。线程池节省了开销,提高了线程使用的效率。阿里巴巴开发文档中建议在编写多线程程序的时候使用线程池。
5.1 ThreadPoolExecutor 构造方法参数
在 juc 包下提供了 ThreadPoolExecutor 类,可以通过该类来创建线程池,这个类中有4个重载的构造方法,最核心的构造方法是有7个形参的,这些参数所代表的意义如下:
参数 | 含义 |
---|---|
corePoolSize | 线程池中核心线程的数量 |
maximumPoolSize | 线程池中最大线程的数量,是核心线程数量和非核心线程数量之和 |
keepAliveTime | 非核心线程空闲的生存时间 |
unit | keepAliveTime 的生存时间单位 |
workQueue | 当没有空闲的线程时,新的任务会加入到 workQueue 中排队等待 |
threadFactory | 线程工厂,用于创建线程 |
handler | 拒绝策略,当任务太多无法处理时的拒绝策略 |
5.2 线程池工作过程
5.3 线程池的状态
状态 | 说明 |
---|---|
RUNNING | 创建线程池之后的状态是 RUNNING |
SHUTDOWN | 该状态下,线程池就不会接收新任务,但会处理阻塞队列剩余任务,相对温和 |
STOP | 该状态下会中断正在执行的任务,并抛弃阻塞队列任务,相对暴力 |
TIDYING | 任务全部执行完毕,活动线程为 0 即将进入终止 |
TERMINATED | 线程池终止 |
5.4 线程池的关闭
线程池使用完毕之后需要进行关闭,提供了以下两种方法进行关闭。
方法 | 说明 |
---|---|
shutdown() | 该方法执行后,线程池状态变为 SHUTDOWN,不会接收新任务,但是会执行完已提交的任务,此方法不会阻塞调用线程的执行。 |
shutdownNow() | 该方法执行后,线程池状态变为 STOP,不会接收新任务,会将队列中的任务返回,并用 interrupt 的方式中断正在执行的任务。 |
5.5 工作队列
jdk 中提供的一些工作队列 workQueue。
队列 | 说明 |
---|---|
SynchronousQueue | 直接提交队列 |
ArrayBlockingQueue | 有界队列,可以指定容量 |
LinkedBlockingDeque | 无界队列 |
PriorityBlockingQueue | 优先任务队列,可以根据任务优先级顺序执行任务 |
6.代码实现
6.1 环境搭建
🍀 基本信息
- 开发工具:IDEA
- JDK 版本:8
- 项目编码:utf-8
🍀 创建项目
在开发工具中创建一个 javase 项目即可,无需导入第三方 jar 依赖。
6.2 实现逻辑
- 先判断是否已存在重复文件,该步骤其实可忽略,因为最终下载合并的文件名已采用时间戳进行了唯一标识;
- 启动一个线程每隔一秒打印下载情况;
- 切分任务,多线程分快下载;
- 全部块文件下载完毕,合并分块文件;
- 合并分块文件完毕,清理分块文件;
- 释放资源,关闭线程池和连接对象。
6.3 项目结构
包名 | 作用 |
---|---|
constant | 存放常量类的包 |
core | 存放了下载器核心类的包 |
util | 存放工具类的包 |
Main | 主类 |
6.4 类代码
🍀 constant 包
📌 Constant
/** * Description: 存放项目常量 * * @Author 狐狸半面添 * @Create 2023/11/6 1:22 * @Version 1.0 */ public class Constant { /** * 指定下载目录的存放位置 */ public static final String PATH = "D:\\download\\"; public static final double MB = 1024d * 1024d; public static final double KB = 1024d; /** * 每次读取的字节大小 */ public static final int BYTE_SIZE = 1024 * 100; /** * 块文件(临时文件)的后缀 */ public static final String PART_FILE_SUFFIX = ".temp"; /** * 线程数量 */ public static final int THREAD_NUM = 5; // 创建存放位置的代码 // public static void main(String[] args) { // File file = new File("D:\\download"); // if (!file.exists()) { // file.mkdir(); // } // } }
🍀 util 包
📌 FileUtils
/** * Description: 文件相关工具 * * @Author 狐狸半面添 * @Create 2023/11/6 11:46 * @Version 1.0 */ public class FileUtils { /** * 获取本地文件的大小 * * @param path 文件路径 * @return 文件大小 */ public static long getFileContentLength(String path) { File file = new File(path); return file.exists() && file.isFile() ? file.length() : 0; } }
📌 HttpUtils
/** * Description: Http 相关工具类 * * @Author 狐狸半面添 * @Create 2023/11/6 1:06 * @Version 1.0 */ public class HttpUtils { private static long id = System.currentTimeMillis(); public static void change() { id = System.currentTimeMillis(); } /** * 获取下载的文件大小 * * @param url 下载文件链接 * @return 文件大小 * @throws IOException */ public static long getHttpFileContentLength(String url) throws IOException { int contentLength; HttpURLConnection httpURLConnection = null; try { httpURLConnection = getHttpURLConnection(url); contentLength = httpURLConnection.getContentLength(); } finally { if (httpURLConnection != null) { httpURLConnection.disconnect(); } } return contentLength; } /** * 分块下载 * * @param url 下载地址 * @param startPos 下载文件起始位置 * @param endPos 下载文件结束位置 * @return 连接对象 */ public static HttpURLConnection getHttpURLConnection(String url, long startPos, long endPos) throws IOException { HttpURLConnection httpURLConnection = getHttpURLConnection(url); LogUtils.info("下载的区间是:{}-{}", startPos, endPos); if (endPos != 0) { httpURLConnection.setRequestProperty("RANGE", "bytes=" + startPos + "-" + endPos); } else { httpURLConnection.setRequestProperty("RANGE", "bytes=" + startPos + "-"); } return httpURLConnection; } /** * 获取 HttpURLConnection 连接对象 * * @param url 文件的地址 * @return HttpURLConnection 连接对象 */ public static HttpURLConnection getHttpURLConnection(String url) throws IOException { URL httpUrl = new URL(url); HttpURLConnection httpURLConnection = (HttpURLConnection) httpUrl.openConnection(); // 向文件所在的服务器发送标识信息 httpURLConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"); return httpURLConnection; } /** * 获取下载文件的名字 * * @param url 下载地址 * @return 文件名 */ public static String getHttpFileName(String url) { String fileName; int startIndex = url.lastIndexOf("/"); int endIndex = url.lastIndexOf("?"); if (endIndex == -1) { fileName = url.substring(startIndex + 1); } else { fileName = url.substring(startIndex + 1, endIndex); } int pointIndex = fileName.lastIndexOf("."); return fileName.substring(0, fileName.lastIndexOf(".")) + "-" + id + fileName.substring(pointIndex); } }
📌 LogUtils
/** * Description: 日志工具类 * * @Author 狐狸半面添 * @Create 2023/11/6 1:41 * @Version 1.0 */ public class LogUtils { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("hh:mm:ss"); public static void info(String msg, Object... args) { print(msg, "-info-", args); } public static void error(String msg, Object... args) { print(msg, "-error-", args); } private static void print(String msg, String level, Object... args) { if (args != null && args.length > 0) { msg = String.format(msg.replace("{}", "%s"), args); } String threadName = Thread.currentThread().getName(); System.out.println(LocalTime.now().format(FORMATTER) + " " + threadName + level + msg); } }
🍀 core 包
📌 DownloadInfoThread
/** * Description: 展示下载信息 * * @Author 狐狸半面添 * @Create 2023/11/6 2:07 * @Version 1.0 */ @SuppressWarnings("AlibabaUndefineMagicConstant") public class DownloadInfoThread implements Runnable { /** * 下载文件总大小 */ private final long httpFileContentLength; /** * 本次累计下载的大小 */ public static volatile LongAdder downSize = new LongAdder(); /** * 前一次下载的大小 */ public double prevSize; public DownloadInfoThread(long httpFileContentLength) { this.httpFileContentLength = httpFileContentLength; } @Override public void run() { // 计算文件总大小 单位是 MB String httpFileSize = String.format("%.2f", httpFileContentLength / Constant.MB); // 计算每秒下载速度 kb int speed = (int) ((downSize.doubleValue() - prevSize) / Constant.KB); prevSize = downSize.doubleValue(); // 剩余文件的大小 double remainSize = httpFileContentLength - downSize.doubleValue(); // 计算剩余时间 String remainTime = String.format("%.1f", remainSize / Constant.KB / speed); if ("Infinity".equalsIgnoreCase(remainTime)) { remainTime = "-"; } // 已下载大小 String currentFileSize = String.format("%.1f", downSize.doubleValue() / Constant.MB); String speedInfo = String.format("已下载 %smb/%smb,速度 %skb/s,剩余时间 %ss", currentFileSize, httpFileSize, speed, remainTime); System.out.print("\r"); System.out.print(speedInfo); } }
📌 DownloaderTask
/** * Description: 分块下载任务 * * @Author 狐狸半面添 * @Create 2023/11/7 0:58 * @Version 1.0 */ public class DownloaderTask implements Callable<Boolean> { private final String url; /** * 下载起始位置 */ private final long startPos; /** * 下载结束位置 */ private final long endPos; /** * 标识当前是哪一部分 */ private final int part; private final CountDownLatch countDownLatch; public DownloaderTask(String url, long startPos, long endPos, int part, CountDownLatch countDownLatch) { this.url = url; this.startPos = startPos; this.endPos = endPos; this.part = part; this.countDownLatch = countDownLatch; } @Override public Boolean call() throws Exception { // 获取文件名 String httpFileName = HttpUtils.getHttpFileName(url); // 分块的文件名 httpFileName = httpFileName + Constant.PART_FILE_SUFFIX + part; // 下载路径 httpFileName = Constant.PATH + httpFileName; // 获取分块下载的连接 HttpURLConnection httpURLConnection = HttpUtils.getHttpURLConnection(url, startPos, endPos); try ( InputStream input = httpURLConnection.getInputStream(); BufferedInputStream bis = new BufferedInputStream(input); RandomAccessFile accessFile = new RandomAccessFile(httpFileName, "rw"); ) { byte[] buffer = new byte[Constant.BYTE_SIZE]; int len; // 循环读取数据 while ((len = bis.read(buffer)) != -1) { // 1s 内下载的数据,通过原子类下载 DownloadInfoThread.downSize.add(len); accessFile.write(buffer, 0, len); } } catch (FileNotFoundException e) { LogUtils.error("下载文件不存在 {}", url); return false; } catch (Exception e) { LogUtils.error("下载出现异常"); return false; } finally { httpURLConnection.disconnect(); countDownLatch.countDown(); } return true; } }
📌 Downloader
/** * Description: 下载器 * * @Author 狐狸半面添 * @Create 2023/11/6 1:21 * @Version 1.0 */ public class Downloader { private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); public ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(Constant.THREAD_NUM, Constant.THREAD_NUM, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)); private CountDownLatch countDownLatch = new CountDownLatch(Constant.THREAD_NUM); public void download(String url) { // 获取文件名 String httpFileName = HttpUtils.getHttpFileName(url); // 文件下载路径 httpFileName = Constant.PATH + httpFileName; // 获取本地文件的大小 long localFileLength = FileUtils.getFileContentLength(httpFileName); HttpURLConnection httpURLConnection = null; DownloadInfoThread downloadInfoThread; try { // 获取连接对象 httpURLConnection = HttpUtils.getHttpURLConnection(url); // 获取下载文件的总大小 int contentLength = httpURLConnection.getContentLength(); // 判断文件是否已下载过 if (localFileLength >= contentLength) { LogUtils.info("{} 已下载完毕,无需重新下载", httpFileName); // 关闭连接对象 httpURLConnection.disconnect(); // 关闭线程池 scheduledExecutorService.shutdownNow(); poolExecutor.shutdown(); return; } // 创建获取下载信息的任务对象 downloadInfoThread = new DownloadInfoThread(contentLength); // 将任务交给线程执行,每隔 1s 打印一次 scheduledExecutorService.scheduleAtFixedRate(downloadInfoThread, 1, 1, TimeUnit.SECONDS); // 切分任务 ArrayList<Future> list = new ArrayList<>(); split(url, list); countDownLatch.await(); System.out.print("\r"); System.out.println("分块文件下载完成"); // 合并文件 if (merge(httpFileName)) { // 清除临时文件 clearTemp(httpFileName); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { System.out.println("本次执行完成"); // 关闭连接对象 if (httpURLConnection != null) { httpURLConnection.disconnect(); } // 关闭线程池 scheduledExecutorService.shutdownNow(); poolExecutor.shutdown(); } } /** * 文件切分 * * @param url 文件链接 * @param futureList 任务集合 */ public void split(String url, ArrayList<Future> futureList) { try { // 获取下载文件大小 long contentLength = HttpUtils.getHttpFileContentLength(url); // 计算切分后的文件大小 long size = contentLength / Constant.THREAD_NUM; // 计算分块个数 for (int i = 0; i < Constant.THREAD_NUM; i++) { // 计算下载起始位置 long startPos = i * size; // 计算结束位置 long endPos; if (i == Constant.THREAD_NUM - 1) { // 下载最后一块 endPos = 0; } else { endPos = startPos + size - 1; } // 创建任务对象 DownloaderTask downloaderTask = new DownloaderTask(url, startPos, endPos, i, countDownLatch); // 将任务提交到线程池 Future<Boolean> future = poolExecutor.submit(downloaderTask); futureList.add(future); } } catch (IOException e) { throw new RuntimeException(e); } } /** * 文件合并 * * @param fileName 文件名 * @return 是否合并成功 */ public boolean merge(String fileName) { LogUtils.info("开始合并文件 {}", fileName); byte[] buffer = new byte[Constant.BYTE_SIZE]; int len; try ( RandomAccessFile accessFile = new RandomAccessFile(fileName, "rw") ) { for (int i = 0; i < Constant.THREAD_NUM; i++) { try ( BufferedInputStream bis = new BufferedInputStream(new FileInputStream(fileName + Constant.PART_FILE_SUFFIX + i)) ) { while ((len = bis.read(buffer)) != -1) { accessFile.write(buffer, 0, len); } } } LogUtils.info("文件合并完毕 {}", fileName); } catch (Exception e) { e.printStackTrace(); return false; } return true; } /** * 清除临时文件 * * @param fileName 文件名 */ public void clearTemp(String fileName) { LogUtils.info("清理分块文件"); for (int i = 0; i < Constant.THREAD_NUM; i++) { String name = fileName + Constant.PART_FILE_SUFFIX + i; File file = new File(name); file.delete(); } LogUtils.info("分块清除完毕"); } }
🍀 Main 主类
public class Main { public static void main(String[] args) { // 下载地址 String url = null; if (args == null || args.length == 0) { while (url == null || url.trim().isEmpty()) { System.out.print("请输入下载链接:"); Scanner scanner = new Scanner(System.in); url = scanner.next(); } } else { url = args[0]; } Downloader downloader = new Downloader(); downloader.download(url); } }
到此这篇关于java多线程文件下载器的实现的文章就介绍到这了,更多相关java多线程文件下载器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!