下载任务执行器简单设计

下载任务类设计

/**
 * 下载任务执行器
 */
@Slf4j
public class Downloader implements Runnable {

    private String filePath;
    private Callable<String> callable;
    private DownloadTask downloadTask;
    private final DownloadTaskService downloadTaskService;

    private Downloader(DownloadTaskService downloadTaskService) {
        this.downloadTaskService = downloadTaskService;
    }

    /**
     * 创建一个任务执行器
     * @param downloadTaskService 下载任务Service
     * @return 任务执行器
     */
    public static Downloader getInstance(DownloadTaskService downloadTaskService) {
        return new Downloader(downloadTaskService);
    }

    /**
     * 创建任务
     * @param taskName 任务名称
     * @param type 任务类型
     * @return 任务执行器
     */
    public Downloader create(String taskName, DownloadTypeEnum type) {
        DownloadTask downloadTask = new DownloadTask();
        downloadTask.setUserId(SecurityUtil.getCurrentUserId());
        downloadTask.setDownloadName(taskName);
        downloadTask.setDownloadType(type.getType());
        downloadTask.setDownloadStatus(DownloadStatusEnum.STATUS_GENERATING.getType());
        downloadTask.setDownloadDescription(type.getName());
        downloadTaskService.save(downloadTask);
        this.downloadTask = downloadTask;
        return this;
    }

    /**
     * 执行任务
     * @param callable 有返回值的线程
     * @return 任务执行器
     */
    public Downloader execute(Callable<String> callable) {
        this.callable = callable;
        return this;
    }

    /**
     * 完成任务
     * @return 下载路径
     */
    public String finish() {
        this.downloadTask.setDownloadPath(this.filePath);
        this.downloadTask.setDownloadStatus(DownloadStatusEnum.STATUS_SUCCESS.getType());
        downloadTaskService.updateById(downloadTask);
        DownloaderFactory.getInstance().remove(this);
        return this.filePath;
    }

    /**
     * 任务失败
     */
    private void taskFail() {
        downloadTask.setDownloadStatus(DownloadStatusEnum.STATUS_FAIL.getType());
        downloadTaskService.updateById(downloadTask);
        DownloaderFactory.getInstance().remove(this);
    }

    @Override
    public void run() {
        try {
            this.filePath = callable.call();
            finish();
        } catch (Exception e) {
            log.error("下载任务执行中异常 Error:", e);
            taskFail();
            throw new BadRequestException("任务执行中异常");
        }
    }

    /**
     * 启动线程
     */
    public void start() {
        DownloaderFactory.getInstance().push(this);
    }

    public String getName() {
        if (null != downloadTask) {
            return downloadTask.getDownloadName();
        }
        return null;
    }
}

下载工厂类设计

/**
 * 下载工厂类
 */
@Slf4j
public class DownloaderFactory {

    /**
     * 单例
     */
    private static DownloaderFactory downloaderFactory = new DownloaderFactory();

    /**
     * 构造方法私有化
     */
    private DownloaderFactory() {}

    /**
     * 最大等待队列
     */
    private static final int MAX_WAIT_SIZE = 100;

    /**
     * 最大执行任务数
     */
    private static final int MAX_EXEC_SIZE = 10;

    /**
     * 是否正在运行
     */
    private static boolean executing = false;

    /**
     * 等待下载队列
     */
    private static ConcurrentLinkedDeque<Downloader> downloaderDeque = new ConcurrentLinkedDeque();

    /**
     * 执行中的队列
     */
    private static ConcurrentLinkedDeque<Downloader> executingDeque = new ConcurrentLinkedDeque<>();

    /**
     * 单例工厂
     * @return 生产工厂
     */
    public static DownloaderFactory getInstance() {
        return downloaderFactory;
    }

    /**
     * 推送下载任务至等待队列
     * @param downloader 任务
     */
    public void push(Downloader downloader) {
        if (MAX_WAIT_SIZE <= downloaderDeque.size()) {
            throw new BadRequestException("当前系统繁忙,请稍候再试!");
        }
        log.info("任务已进入队列--->{}", downloader.getName());
        downloaderDeque.push(downloader);
    }

    /**
     * 激活生产线
     */
    public void start() {
        if (!executing) {
            log.info("执行器正在工作--->");
            AsyncManager.instance().execute(() -> execute());
        }
        if (downloaderDeque.size() == 0) {
            log.info("一直没活来,眯一会zzZ");
            try {
                Thread.sleep(60000L);
            } catch (InterruptedException e) {
                log.info("工厂出事了,检查一下");
            }
        }
        start();
    }

    /**
     * 生产线
     */
    private void execute() {
        executing = true;
        try {
            while (true) {
                if (MAX_EXEC_SIZE <= executingDeque.size()) {
                    log.info("流水线已经爆满了,等会再来吧");
                    continue;
                }
                if (downloaderDeque.size() > 0) {
                    Downloader downloader = downloaderDeque.pop();
                    log.info("老铁,来活了--->{}", downloader.getName());
                    executingDeque.push(downloader);
                    try {
                        AsyncManager.instance().execute(downloader);
                    } catch (Exception e) {
                        log.info("生产失败咯,换下一个吧:", e);
                        remove(downloader);
                        continue;
                    }
                }
            }
        } catch (Exception e) {
            log.error("生产线着火了:", e);
            executing = false;
        }
    }

    /**
     * 生产完成或失败了,扔出队列吧
     * @param downloader 下载任务
     */
    public void remove(Downloader downloader) {
        executingDeque.remove(downloader);
    }

}

系统问题

  • 下载需要生成很多零散的文件
  • 需要对零散的文件进行分组,排序和压缩
  • 若不加以限制系统执行的任务数量,极可能造成OOM问题从而搞垮系统

分享

  • 该类有关联的下载任务业务类需要根据自己项目的实际情况进行替换
  • 最大执行队列也需要根据服务器的性能进行评估
  • 最后欢迎各位大佬指点小弟,万分感激
正文到此结束
评论插件初始化中...
Loading...