时间轮

钟表想必大家都很熟悉,而其中带刻度表盘和表针就是我们这个组件的核心,它们分别代表时间轮和定时器

时间轮可以想象为下面这张图

我只需要把任务放到它需要被执行的时刻,然后等着时针转到这个时刻时,取出该时刻放置的任务,执行就可以了,这就是时间轮算法最核心的思想了。

这个定时器可以理解为 JavaScript 中的 setInterval 函数,一个实现定时调用的函数,定时器和时间轮结合实现了任务的高效、精准调度。

时间轮核心方法

这里时间轮的核心方法只有两个,一个是将任务放入到时间轮,另一个就是从时间轮指定槽位获取任务

public interface TimeWheel {

    /**
     * 任务放入到时间轮
     *
     * @param taskId
     * @param cron
     */
    void put(Long taskId, String cron);

    /**
     * 获取时间轮指定槽位的任务
     *
     * @return
     */
    List<WheelEntity> take(int slot);
}

时间轮的数据结构

首先,理解时间轮必须要理解它的数据结构,这个非常重要,我们这里使用 Map 来表示,Map 的 key 对应时钟上的刻度,每个刻度就是一个槽,槽位本身也指代时间精度,比如一秒扫一个槽,那么这个时间轮的最高精度就是 1 秒,

槽用来存放该刻度需要执行的任务,如果有多个任务需要执行呢?每个槽里面放一个链表就可以了,所以这里 Map 的 value 就是一个列表。

同一时刻存在多个任务时,只要把该刻度对应的链表全部遍历一遍,执行其中的任务即可,注意这里的其中,而不是全部,下面会细说。

public class HashedTimeWheel implements TimeWheel {

    private final long tickDuration; // 槽数
    private static final Map<Integer, List<WheelEntity>> timeWheel = new ConcurrentHashMap<>();

    public HashedTimeWheel(ScheduleProperties scheduleProperties){
        long tickDuration = scheduleProperties.getTickDuration();
        if (tickDuration <= 0){
            throw new ScheduleException("The tickDuration must more than zero.");
        }
        this.tickDuration = tickDuration;
    }
}

这里有个问题,那就是我们时间轮的槽数如何确定,如果任务不只限定在一天之内呢?比如我有个任务,需要每周一上午九点执行,我还有另一个任务,需要每周三的上午九点执行。一种很容易想到的解决办法是:

1.增大时间轮的刻度

一天 24 个小时,一周 168 个小时,为了解决上面的问题,我可以把时间轮的刻度(槽)从 12 个增加到 168 个,比如现在是星期二上午 10 点钟,那么下周一上午九点就是时间轮的第 9 个刻度,这周三上午九点就是时间轮的第 57 个刻度

仔细思考一下,会发现这中方式存在几个缺陷:

  • 时间刻度太多会导致时间轮走到的多数刻度没有任务执行,比如一个月就 2 个任务,我得移动 720 次,其中 718 次是无用功。

  • 时间刻度太多会导致存储空间变大,利用率变低,比如一个月就 2 个任务,我得需要大小是 720 的数组,如果我的执行时间的粒度精确到秒,那就更恐怖了。

于是乎,聪明的你脑袋一转,想到另一个办法:

2.列表中的任务中添加 round 属性

这次我不增加时间轮的刻度了,刻度是 24 个,现在有三个任务需要执行,

  1. 任务一: 每周六上午九点。

  2. 任务二: 每周四上午九点。

  3. 任务三: 每个月 12 号上午九点。

比如现在是 9 月 9 号星期六上午 10 点,时间轮转一圈是 24 小时,到任务一下次执行(下周六上午九点),需要时间轮转过 6 圈后,到第 7 圈的第 9 个刻度开始执行。

任务二下次执行第 5 圈的第 9 个刻度,任务三是第 3 圈的第 9 个刻度。

时间轮每移动到一个刻度时,遍历任务列表,取出所有 round=0 的任务执行 然后把剩余的任务 round 值-1。

我们的时间轮采用的就是添加 round 属性解决,具体代码如下:

public class HashedTimeWheel implements TimeWheel {

    private final long tickDuration;
    private static final Map<Integer, List<WheelEntity>> timeWheel = new ConcurrentHashMap<>();

    public HashedTimeWheel(ScheduleProperties scheduleProperties){
        long tickDuration = scheduleProperties.getTickDuration();
        if (tickDuration <= 0){
            throw new ScheduleException("The tickDuration must more than zero");
        }
        this.tickDuration = tickDuration;
    }

    @Override
    public void put(Long taskId, String cron) {
        long nextTime = CronHelper.getNextTime(cron);
        long nowTime = System.currentTimeMillis() / 1000;
        if (nextTime <= nowTime){
            throw new ScheduleException("The nextTime must more than the nowTime");
        }

        long diff = nextTime - nowTime;
        long round = diff / tickDuration;
        int tick = (int) (nextTime % tickDuration);

        List<WheelEntity> taskList = timeWheel.getOrDefault(tick, new ArrayList<>());
        WheelEntity wheelEntity = new WheelEntity(taskId, round, cron);
        taskList.add(wheelEntity);
        timeWheel.put(tick, taskList);
    }

    @Override
    public List<WheelEntity> take(int slot) {
        List<WheelEntity> entities = timeWheel.get(slot);

        if (CollectionUtils.isEmpty(entities)){
            return Collections.emptyList();
        }

        List<WheelEntity> tasks = entities.stream().filter(e -> e.getRound() == 0L).collect(Collectors.toList());
        entities.removeAll(tasks);
        updateRound(slot, entities);
        return tasks;
    }

    private void updateRound(int tick, List<WheelEntity> entities){
        if (!CollectionUtils.isEmpty(entities)){
            for (WheelEntity entity : entities) {
                entity.setRound(entity.getRound() - 1L);
            }

            timeWheel.put(tick, entities);
        }
    }
}