触发器
触发器是被调度器触发调用。
public interface ScheduleTaskTrigger {
/**
* 任务触发
*/
void trigger();
}
主要就是扫描当前刻度(槽)位上的任务然后通过执行器去执行,以下是具体代码:
@Slf4j
public class DefaultScheduleTaskTrigger implements ScheduleTaskTrigger {
private final TimeWheel timeWheel;
private final ScheduleTaskService scheduleTaskService;
private final ScheduleTaskExecutor executor;
public DefaultScheduleTaskTrigger(TimeWheel timeWheel,
ScheduleTaskService scheduleTaskService,
ScheduleTaskExecutor executor) {
this.timeWheel = timeWheel;
this.scheduleTaskService = scheduleTaskService;
this.executor = executor;
}
@Override
public void trigger() {
int slot = Calendar.getInstance().get(Calendar.SECOND);
List<WheelEntity> taskList = timeWheel.take(slot);
if (CollectionUtils.isEmpty(taskList)){
return;
}
List<ScheduleTask> scheduleTasks = refreshNextTime(taskList);
List<Long> taskIds = scheduleTasks.stream().map(ScheduleTask::getTaskId).collect(Collectors.toList());
try {
executor.execute(taskIds);
}catch (Exception e){
log.error("Execute task error:{}", e.getMessage(), e);
}
}
/**
* 刷新下次执行时间
*
* @param taskList
* @return 任务列表
*/
private List<ScheduleTask> refreshNextTime(List<WheelEntity> taskList) {
List<ScheduleTask> tasks = new ArrayList<>();
for (WheelEntity entity : taskList) {
ScheduleTask task = scheduleTaskService.get(entity.getTaskId());
if (Objects.isNull(task)){
continue;
}
try {
timeWheel.put(task.getTaskId(), task.getCronExpression());
}catch (Exception e){
log.error("Refresh task error:{}", e.getMessage(), e);
}
if (!StringUtils.equals(entity.getCron(), task.getCronExpression())){
continue;
}
tasks.add(task);
}
return tasks;
}
}