0%

定时任务——xxljob

定时任务

Spring定时任务是一种可靠的任务调度框架,它能够根据时间表自动执行任务,无需人工干预。这些任务可以是周期性的或只执行一次,例如每天执行一次清理任务,每周执行一次数据备份等。

java自带解决方案

Timer

创建 java.util.TimerTask 任务,在 run 方法中实现业务逻辑。通过 java.util.Timer 进行调度,支持按照固定频率执行。所有的 TimerTask 是在同一个线程中串行执行,相互影响。也就是说,对于同一个 Timer 里的多个 TimerTask 任务,如果一个 TimerTask 任务在执行中,其它 TimerTask 即使到达执行的时间,也只能排队等待。如果有异常产生,线程将退出,整个定时任务就失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.Timer;
import java.util.TimerTask;


public class TestTimerTask {


public static void main(String[] args) {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("hell world");
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 10, 3000);
}


}

ScheduledExecutorService

Spring定时任务是基于Java的ScheduledExecutorService实现的,这是Java自带的一个任务调度器。Spring封装了这个调度器,提供了一些方便的注解和API,使得我们可以更加简单地创建和管理定时任务。

一般情况下,我们需要完成以下几个步骤来创建和配置Spring定时任务:

  1. 配置任务执行器

我们需要在Spring配置文件中配置一个线程池,用来执行定时任务。线程池的大小可以根据任务的数量和复杂度进行调整,以保证任务的及时执行。

1
2
3
4
5
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="50" />
<property name="queueCapacity" value="1000" />
</bean>

这里我们创建了一个名为”taskExecutor”的线程池,用来执行定时任务。线程池的大小为10-50个线程,队列容量为1000。

  1. 配置定时任务

我们可以使用Spring提供的@Scheduled注解或XML配置来创建定时任务。注解的方式更加简单和方便,可以直接标注在任务方法上,指定任务的执行时间和频率等参数。XML配置方式需要在Spring配置文件中定义一个task:annotation-driven元素,并在任务方法上使用task:scheduled元素来指定任务的执行时间和频率等参数。

1
2
3
4
5
6
7
8
9
@Component
public class MyTask {

@Scheduled(cron = "0 0 0 * * ?")
public void doTask() {
// TODO: 任务逻辑代码
System.out.println("执行任务");
}
}

这里我们在MyTask类上添加了@Component注解,使得它可以被Spring管理。在doTask方法上使用@Scheduled注解来指定任务的执行时间和频率,这里使用的是Cron表达式,表示每天0点执行。

  1. 编写任务逻辑

我们需要编写任务逻辑代码,用来完成任务要求的具体操作。这些操作可以是数据库访问、文件处理、网络通信等,任务逻辑的具体实现方式取决于任务的实际需求。

  1. 启动任务调度器

当我们配置好任务执行器和定时任务后,需要通过代码来启动任务调度器。Spring提供了一个ScheduledExecutorFactoryBean类,可以方便地创建和管理任务调度器。

1
2
3
4
5
@Configuration
@EnableScheduling
public class AppConfig {

}

这里我们在AppConfig类上添加@Configuration注解,表示它是一个Spring配置类。然后使用@EnableScheduling注解来启用Spring定时任务的支持。

spring自带解决方案

Spring Task

Spring Task是Spring框架提供的一种简单易用的任务调度框架,它基于Java的ScheduledExecutorService实现,与Spring Boot、Spring MVC等框架集成非常方便。和Spring定时任务类似,Spring Task可以用来执行定时任务、周期性任务、异步任务等。

相比于Spring定时任务,Spring Task更加轻量级和易用,不需要过多的配置和依赖,也不需要特殊的任务执行器。在Spring Boot中,只需要添加@EnableScheduling注解和@Scheduled注解即可使用Spring Task。

  1. 配置任务执行器

不需要显式地配置任务执行器,Spring Task会自动使用ScheduledExecutorService来执行任务。

  1. 配置定时任务

使用@Scheduled注解来创建一个定时任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
javaCopy code@Component
public class MyTask {
/**
* 每隔5秒跑一次
*/
@Scheduled(fixedRate = 5000)
/**
* 上次跑完隔3秒再跑
*/
@Scheduled(fixedDelay = 3000)
/**
* 每分钟的第30秒跑一次
*/
@Scheduled(cron = "30 * * * * ?")
public void doTask() {
// TODO: 任务逻辑代码
System.out.println("执行任务");
}
}

这里我们在MyTask类上添加了@Component注解,使得它可以被Spring管理。在doTask方法上使用@Scheduled注解来指定任务的执行时间和频率,这里使用的是fixedRate属性,表示每5秒执行一次。

  1. 启动任务调度器

在Spring Boot中,可以通过@SpringBootApplication注解自动启动任务调度器。如果需要手动启动,可以在代码中添加以下配置:

1
2
3
4
5
javaCopy code@Configuration
@EnableScheduling
public class AppConfig {

}

这里我们在AppConfig类上添加@Configuration注解,表示它是一个Spring配置类。然后使用@EnableScheduling注解来启用Spring Task的支持。

Spring Task 相对于上面提到的两种解决方案,最大的优势就是支持 cron 表达式,可以处理按照标准时间固定周期执行的业务,比如每天几点几分执行。

业务幂等解决方案

业务幂等是指对于同一个业务请求,无论接收到多少次请求,最终处理的结果都是一样的。在分布式系统中,由于网络不可靠、消息重复等问题,可能会导致同一个业务请求被重复处理,从而产生错误或不一致的结果。因此,保证业务的幂等性是分布式系统中一个重要的问题。

  1. 乐观锁

在数据库中添加一个版本号或时间戳字段,每次更新记录时将版本号加1或时间戳更新为当前时间。在处理请求时先检查版本号或时间戳,如果和当前记录一致,则说明请求是第一次处理,否则说明请求已经被处理过了,直接返回结果即可。

  1. 悲观锁

在数据库中添加一个唯一索引,并使用SELECT … FOR UPDATE语句在处理请求时对记录加锁,确保同一时刻只有一个请求能够更新记录。如果有其他请求在处理该记录,则直接返回结果即可。

  1. Token机制

在处理请求时生成一个唯一的Token,并将Token存储在缓存或数据库中,每次处理请求时先检查Token是否存在,如果存在则说明请求已经被处理过了,否则说明请求是第一次处理,将Token存储起来并执行相应的业务逻辑。Token可以通过UUID或者Snowflake算法等生成。

  1. 消息队列

将请求发送到消息队列中,保证消息队列的幂等性。使用消息队列的消费者消费消息时,先检查该消息是否已经处理过,如果已经处理过则直接忽略,否则执行相应的业务逻辑,并将处理结果存储到数据库或缓存中,保证幂等性。

  1. 去重表

在数据库中创建一个去重表,每次处理请求时先检查去重表中是否存在相同的请求,如果存在则说明请求已经被处理过了,否则将请求处理后存储到去重表中,保证幂等性。去重表可以使用缓存或者数据库来实现。

分布式锁方案

基于Zookeeper实现分布式锁和定时任务

实现方式:使用Zookeeper的节点互斥性来实现分布式锁,使用watch机制来监听节点变化,实现分布式定时任务。在定时任务执行前,先检查锁是否已经过期,如果过期则删除锁节点,其他节点就可以获取锁执行任务。

优点:由于Zookeeper的一致性特性,可以保证锁的强一致性,避免出现数据不一致的问题。同时,Zookeeper自带了锁机制,使用方便。

缺点:Zookeeper的配置和运维比较复杂,需要专门的运维人员来管理。同时,由于Zookeeper的性能问题,可能会出现性能瓶颈,需要进行优化。

img

  1. 定时时间到了,在回调方法里,先去抢锁。
  2. 抢到锁,则继续执行方法,没抢到锁直接返回。
  3. 执行完方法后,释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
public class ZookeeperTask {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperTask.class);

@Autowired
private ZooKeeperClient zooKeeperClient;

@Scheduled(fixedDelay = 5000) // 间隔5秒执行
public void task() {
String lockPath = "/locks/test"; // 锁的路径
long expireTime = 10000; // 锁的过期时间(毫秒)
long waitTime = 5000; // 等待获取锁的时间(毫秒)

ZooKeeperLock lock = new ZooKeeperLock(zooKeeperClient, lockPath);
try {
// 尝试获取锁
boolean locked = lock.tryLock(waitTime, expireTime, TimeUnit.MILLISECONDS);
if (!locked) {
logger.info("Failed to acquire lock");
return;
}

// 执行任务
logger.info("Start task");
// do something...
logger.info("End task");
} catch (InterruptedException | KeeperException e) {
logger.error("Failed to acquire lock", e);
} finally {
// 释放锁
lock.unlock();
}
}
}

使用Redis实现分布式锁和定时任务

实现方式:使用Redis的setnx命令获取锁,执行任务后再使用del命令释放锁,获取锁时可以设置过期时间。在定时任务执行前,先检查锁是否已经过期,如果过期则重新获取锁。

优点:实现简单,可以通过Redis的高可用机制来保证系统的可用性,同时由于Redis的单线程机制,保证了锁的互斥性。

缺点:由于Redis的缓存特性,锁的超时时间可能会不精确,会有一定的误差。同时,由于锁的超时时间不确定,可能会出现锁被错误地释放或者占用的情况,需要做好超时时间的设置和调整。

img

使用 redis 抢锁,其实架构上和 DB/zookeeper 差不多, redis 抢锁支持过期时间,不用主动去释放锁,并且可以充分利用这个过期时间,解决任务执行过快释放锁导致任务重复执行的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Component
public class RedisTask {
private static final Logger logger = LoggerFactory.getLogger(RedisTask.class);

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Scheduled(fixedDelay = 5000) // 间隔5秒执行
public void task() {
String lockKey = "lock:test"; // 锁的key
long expireTime = 10000; // 锁的过期时间(毫秒)
long waitTime = 5000; // 等待获取锁的时间(毫秒)

// 尝试获取锁
boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", expireTime, TimeUnit.MILLISECONDS);
if (!locked) {
logger.info("Failed to acquire lock");
return;
}

try {
// 执行任务
logger.info("Start task");
// do something...
logger.info("End task");
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
}
}

上述代码中,使用RedisTemplate来操作Redis,使用setIfAbsent方法获取锁,如果获取锁失败则直接返回,否则执行任务。在finally块中释放锁。可以根据实际业务情况来修改锁的key、过期时间和等待时间等参数。

需要注意的是,如果任务执行时间比锁的过期时间长,可能会出现锁被错误地释放的情况,需要根据实际业务情况来设置锁的过期时间。同时,如果集群中的多个节点使用相同的锁key,可能会出现锁竞争的情况,需要避免这种情况的发生。

使用分布式锁组件

使用分布式锁组件,如阿里巴巴的DistributedLock、美团的Zebra等,来实现分布式锁和定时任务,通过这些组件的API来获取锁,执行完任务后再释放锁。在执行任务前,先检查锁的超时时间是否已经过期,如果过期则重新获取锁。

XXLjob

XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

通俗来讲:XXL-JOB是一个任务调度框架,通过引入XXL-JOB相关的依赖,按照相关格式撰写代码后,可在其可视化界面进行任务的启动执行中止以及包含了日志记录与查询任务状态监控

设计思想

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

如果将XXL-JOB形容为一个人的话,每一个引入xxl-job的微服务就相当于一个独立的人(执行器),而按照相关约定格式撰写的Handler为餐桌上的食物,可视化界面则可以决定哪个执行器(人),吃东西或者不吃某个东西(定时任务),在什么时间吃(Corn表达式控制或者执行或终止或者;立即开始);

架构图:

img

系统组成

调度模块(调度中心):负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

执行模块(执行器):负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;

接收“调度中心”的执行请求、终止请求和日志请求等。

注册中心模块:

XXL-RPC的注册中心,是可选组件,支持服务注册并动态发现,可选择不启用,直接指定服务提供方机器地址通讯,选择启用时,内置可选方案:“XXL-RPC-ADMIN 轻量级服务注册中心”(推荐)、“ZK注册中心”、“Local注册中心”等

执行器模块:

执行器方面是基于数据库的集群方案,数据库选用Mysql;集群分布式并发环境中进行定时任务调度时,会在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。

服务通讯模块:

XXL-RPC提供多中通讯方案:支持 TCP 和 HTTP 两种通讯方式进行服务调用;其中 TCP 提供可选方案 NETTY ,HTTP 提供可选方案 NETTY_HTTP (新版本移除了Mina和Jetty通讯方案,主推Netty;如果有需要可以参考旧版本)。

特点

简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;

动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;

调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA;

执行器HA(分布式):任务分布式执行,任务”执行器”支持集群部署,可保证任务执行HA;

注册中心: 执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址;

弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;

路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;

故障转移:任务路由策略选择”故障转移”情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。

阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;

任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;

任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试;

任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;

分片广播任务:执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务;

动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

事件触发:除了”Cron方式”和”任务依赖方式”触发任务执行之外,支持基于事件的触发任务方式。调度中心提供触发任务单次执行的API服务,可根据业务事件灵活触发。

任务进度监控:支持实时监控任务进度;

Rolling实时日志:支持在线查看调度结果,并且支持以Rolling方式实时查看执行器输出的完整的执行日志;

GLUE:提供Web IDE,支持在线开发任务逻辑代码,动态发布,实时编译生效,省略部署上线的过程。支持30个版本的历史版本回溯。

脚本任务:支持以GLUE模式开发和运行脚本任务,包括Shell、Python、NodeJS、PHP、PowerShell等类型脚本;

命令行任务:原生提供通用命令行任务Handler(Bean任务,”CommandJobHandler”);业务方只需要提供命令行即可;

任务依赖:支持配置子任务依赖,当父任务执行结束且执行成功后将会主动触发一次子任务的执行, 多个子任务用逗号分隔;

一致性:“调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行;

自定义任务参数:支持在线配置调度任务入参,即时生效;

调度线程池:调度系统多线程触发调度运行,确保调度精确执行,不被堵塞;

数据加密:调度中心和执行器之间的通讯进行数据加密,提升调度信息安全性;

邮件报警:任务失败时支持邮件报警,支持配置多邮件地址群发报警邮件;

推送maven中央仓库: 将会把最新稳定版推送到maven中央仓库, 方便用户接入和使用;

运行报表:支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等;

全异步:任务调度流程全异步化设计实现,如异步调度、异步运行、异步回调等,有效对密集调度进行流量削峰,理论上支持任意时长任务的运行;

跨平台:原生提供通用HTTP任务Handler(Bean任务,”HttpJobHandler”);业务方只需要提供HTTP链接即可,不限制语言、平台;

国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文;

容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用;

线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入”Slow”线程池,避免耗尽调度线程,提高系统稳定性;;

用户管理:支持在线管理系统用户,存在管理员、普通用户两种角色;

权限控制:执行器维度进行权限控制,管理员拥有全量权限,普通用户需要分配执行器权限后才允许相关操作;

高可用:调度中心高可用,调度中心支持多节点部署,基于数据库行锁保证同时只有一个调度中心节点触发任务调度。任务调度高可用,调度中心基于路由策略路由选择一个执行器节点执行任务,XXL-JOB提供了诸多“忙碌转移策略”和“故障转移策略”的路由策略保证任务调度高可用。

分区容错:服务提供方集群注册时,某个服务节点不可用时将会自动摘除,同时消费方将会移除失效节点将流量分发到其余节点,提高系统容错能力。

工作原理

img

执行流程:

a、执行器根据配置的调度中心的地址,自动注册到调度中心

b、达到任务触发条件,调度中心下发任务

c、执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中

d、执行器的回调线程消费内存队列中的执行结果,主动上报给调度中心

e、当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

个人总结:

xxl-job是一个易上手, 支持多种通讯协议多种注册中心,可自由灵活配置,完善的日志系统,清晰简洁的可视化页面,多执行策略,支持分片和分片广播,完整的补偿机制,重试重发熔断等。组件也存在一定的不租,一定的入侵性,对数据库高度依赖,需要java环境等。总体来说xxl-job依然是一个非常不错的“分布式任务调度平台”。