微信搜索
江南一点雨

Spring 定时任务玩出花!

今天抽空撸一篇文章,和大家聊一聊上篇文章的可视化定时任务是怎么实现的。

上篇文章发了后,有不少小伙伴在评论区提出了一些问题,我觉得还挺有意思的,这篇文章应该能解决掉大家的大部分疑惑。

好啦,就不废话了,我们来看看具体实现方案。

对了,如果还没看过上篇文章的小伙伴可以先看看,不然可能不知道我在说啥,传送门:


1. 项目概览

我们首先来大概看下这个项目:

这里和定时任务相关的配置主要在 config 包里边,其他的都是业务类代码,换句话说其他的都是常规的 CURD,所以我这里主要和小伙伴们介绍 config 中的代码。

2. 整体思路

我先来说说这个项目的整体思路,这样方便大家理解下面的内容。

在这个项目中,每一个定时任务都由一个线程去处理,负责处理每一个定时任务的线程类是 SchedulingRunnable,所有的线程都跑在一个线程池中,这个线程池是 ThreadPoolTaskScheduler,这是一个专为定时任务设计的线程池(支持 Cron 表达式),它的底层其实就是大家所熟知的 ScheduledThreadPoolExecutor。当有一个新的定时任务需要执行时,创建一个 SchedulingRunnable 线程,然后连同 Cron 表达式一起扔到 ThreadPoolTaskScheduler 池子里去执行就行了。

3. 配置分析

几个配置类我们逐一来分析。

3.1 SpringContextUtils

首先我们提供了一个 SpringContextUtils 工具类,这个工具类实现了 ApplicationContextAware 接口,通过这个工具类,我们可以从 Spring 容器中查询一个 Bean 或者判断 Spring 容器中是否存在某一个 Bean,工具类的代码如下(我主要列出来了有哪些方法,具体实现大家可以参考:https://github.com/lenve/scheduling):

@Component
public class SpringContextUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        SpringContextUtils.applicationContext = applicationContext;
    }

    public static Object getBean(String name) {
    }

    public static <T> T getBean(Class<T> requiredType) {
    }

    public static <T> T getBean(String name, Class<T> requiredType) {
    }

    public static boolean containsBean(String name) {
    }

    public static boolean isSingleton(String name) {
    }

    public static Class<? extends Object> getType(String name) {
    }
}

3.2 SchedulingRunnable

将来每一个定时任务执行的时候,我们都开启一个新的线程去执行这个定时任务,SchedulingRunnable 就是关于这个线程的配置,我们来看下:

public class SchedulingRunnable implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(SchedulingRunnable.class);

    private String beanName;

    private String methodName;

    private String params;

    private Object targetBean;

    private Method method;

    public SchedulingRunnable(String beanName, String methodName) {
        this(beanName, methodName, null);
    }

    public SchedulingRunnable(String beanName, String methodName, String params) {
        this.beanName = beanName;
        this.methodName = methodName;
        this.params = params;
        init();
    }

    private void init() {
        try {
            targetBean = SpringContextUtils.getBean(beanName);

            if (StringUtils.hasText(params)) {
                method = targetBean.getClass().getDeclaredMethod(methodName, String.class);
            } else {
                method = targetBean.getClass().getDeclaredMethod(methodName);
            }

            ReflectionUtils.makeAccessible(method);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        logger.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params);
        long startTime = System.currentTimeMillis();

        try {
            if (StringUtils.hasText(params)) {
                method.invoke(targetBean, params);
            } else {
                method.invoke(targetBean);
            }
        } catch (Exception ex) {
            logger.error(String.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", beanName, methodName, params), ex);
        }

        long times = System.currentTimeMillis() - startTime;
        logger.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", beanName, methodName, params, times);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        SchedulingRunnable that = (SchedulingRunnable) o;
        if (params == null) {
            return beanName.equals(that.beanName) &&
                    methodName.equals(that.methodName) &&
                    that.params == null;
        }

        return beanName.equals(that.beanName) &&
                methodName.equals(that.methodName) &&
                params.equals(that.params);
    }

    @Override
    public int hashCode() {
        if (params == null) {
            return Objects.hash(beanName, methodName);
        }

        return Objects.hash(beanName, methodName, params);
    }
}

SchedulingRunnable 实现了 Runnable 接口,这里的实现逻辑也比较简单,我们一起来看下:

  1. 首先声明了 beanName、methodName 以及 params 分别作为定时任务执行的 Bean 的 bean 名称、方法名称以及方法参数。不知道小伙伴们是否记得我们上篇文章中介绍的该系统的用法,在添加一个定时任务时,我们需要传入相应的 beanName、methodName 以及 params 参数,传入后就来到这里了。另外还有 targetBean 和 method 分别表示 beanName 对应的对象以及 methodName 对应的对象,其中 targetBean 通过 beanName 从 Spring 容器中查找,method 则通过 methodName 从 targetBean 中查找。
  2. 在 run 方法中,通过反射去调用 method 方法,这也是定时任务执行时候的具体逻辑。
  3. 另外,这里重写了 equals 和 hashCode 方法,这两个方法主要是比较了 beanName、methodName 以及 params 三个属性,换言之,如果这三个属性相同,则认为这是同一个对象(这三个属性相同表示这是同一个定时任务)。

3.3 SchedulingConfig

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}

这里主要是配置一下 ThreadPoolTaskScheduler,这个可以很方便的对重复执行的任务进行调度管理,相比于通过 Java 自带的周期性任务线程池ScheduleThreadPoolExecutor,ThreadPoolTaskScheduler 对象支持根据 Cron 表达式创建周期性任务。

既然是线程池,必然就有线程数量等问题,它的核心线程池大小就是我们配置的 poolSize 属性,最大线程池大小是 Integer.MAX_VALUEkeepAliveTime 为 0 ,这里用到的队列是 DelayedWorkQueue ,这个队列有一个属性 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>(); 对这个队列的操作实际是是对这个 DelayQueue 的操作,这个队列大小是 Integer.MAX_VALUE,所以线程数量肯定是够用了。

其他配置就没啥好说的。

3.4 ScheduledTask

ScheduledTask 是 ScheduledFuture 的包装类,这个包装类中主要多了一个 future 属性,这个 future 属性表示 TaskScheduler 定时任务线程池的执行结果:

public final class ScheduledTask {
    volatile ScheduledFuture<?> future;
    public void cancel() {
        ScheduledFuture<?> future = this.future;
        if (future != null) {
            future.cancel(true);
        }
    }
}

3.5 CronTaskRegistrar

核心的方法都在这个里边。

@Component
public class CronTaskRegistrar implements DisposableBean {

    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);

    @Autowired
    private TaskScheduler taskScheduler;

    public TaskScheduler getScheduler() {
        return this.taskScheduler;
    }

    public void addCronTask(Runnable task, String cronExpression) {
        addCronTask(new CronTask(task, cronExpression));
    }

    public void addCronTask(CronTask cronTask) {
        if (cronTask != null) {
            Runnable task = cronTask.getRunnable();
            if (this.scheduledTasks.containsKey(task)) {
                removeCronTask(task);
            }

            this.scheduledTasks.put(task, scheduleCronTask(cronTask));
        }
    }

    public void removeCronTask(Runnable task) {
        ScheduledTask scheduledTask = this.scheduledTasks.remove(task);
        if (scheduledTask != null)
            scheduledTask.cancel();
    }

    public ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());

        return scheduledTask;
    }


    @Override
    public void destroy() {
        for (ScheduledTask task : this.scheduledTasks.values()) {
            task.cancel();
        }

        this.scheduledTasks.clear();
    }
}

稍微说下这个类:

  1. 首先这个类实现了 DisposableBean 接口,实现这个接口就重写了 destroy 方法,以便在 Bean 销毁的时候,清除所有的定时任务。
  2. addCronTask(Runnable, String) 方法用来添加一个定时任务,传两个参数,第一个是 Runnable,也就是我们前面所说的定时任务,第二个则是一个 Cron 表达式。
  3. addCronTask(CronTask) 方法也用来添加定时任务,添加之前先判断这个定时任务是否已经存在,如果已经存在,就先移除。然后将定时任务存入 scheduledTasks 中,存储的时候,key 就是那个 Runnable 对象,value 则是一个 ScheduledTask 对象。
  4. ScheduledTask 对象从 scheduleCronTask 方法中获取,这也是整个系统最最核心的一段代码,调用 taskScheduler 对象把定时任务添加进去。
  5. removeCronTask 方法用来移除一个定时任务,移除分为两部分:1. 从 scheduledTasks 集合中找到定时任务并移除;2. 取消定时任务的执行。
  6. 最后的 destroy 方法就是一个常规方法,该移除移除,该清空清空。

3.6 InitTask

这是一个处理数据库中已有定时任务的类。当系统启动时,首先从数据库中读取需要定时执行的任务,然后挨个加入定时任务执行器中:

@Component
public class InitTask implements CommandLineRunner {
    @Autowired
    CronTaskRegistrar cronTaskRegistrar;
    @Autowired
    SysJobService sysJobService;

    @Override
    public void run(String... args) throws Exception {
        List<SysJob> list = sysJobService.getJobsByStatus(1);
        for (SysJob sysJob : list) {
            cronTaskRegistrar.addCronTask(new SchedulingRunnable(sysJob.getBeanName(), sysJob.getMethodName(), sysJob.getMethodParams()), sysJob.getCronExpression());
        }
    }
}
  1. 查询所有状态为 1 的定时任务。
  2. 遍历第一步查询出来的集合,添加定时任务。

好啦,这就是整个项目最最核心的配置了,其他的代码都是一些业务层面的代码,乏善可陈,我就不啰嗦啦。

4. 定时任务怎么配

有的小伙伴可能还不知道定时任务怎么配置,我这里稍微说两句。

项目中提供了如下一个测试类:

@Component("schedulingTaskDemo")
public class SchedulingTaskDemo {
    public void taskWithParams(String params) {
        System.out.println("执行有参示例任务:" + params);
    }

    public void taskNoParams() {
        System.out.println("执行无参示例任务");
    }
}

这是提前写好的,需要的时候我们配置的定时任务就是这里相关的参数,如下图:

Bean 名称、方法名称都和测试案例中的 Bean 一一对应。

5. 小结

好啦,是不是很 Easy?小伙伴们赶紧去尝试下吧!

项目地址:

  • GitHub:https://github.com/lenve/scheduling
  • Gitee:https://gitee.com/lenve/scheduling
赞(6)
未经允许不得转载:江南一点雨 » Spring 定时任务玩出花!
分享到: 更多 (0)
扫码关注微信公众号【江南一点雨】,回复 1024,查看松哥原创 Java 实战教程(图文+视频)。

专注 Java 一百年

关注我们国际站