Skip to content

定时任务

技术选型

SpringTask,@Schedule 注解,且支持 cron 表达式。缺点:单机模式。

Quartz,有集群部署方案,但是 API 有些复杂,且不支持很多高级功能。

所以使用 XXL-JOB。

关于分布式定时任务的实现原理。看看文档的 3.29 介绍。几个角色。

关于 XXL-JOB 的架构设计。看看互联网上的资料。

定时任务常见应用场景:

  • 定时推送消息
  • 定时扫表更新状态
  • 定时更新数据

简要说明

定时任务相当于实时任务的拓展。

所以定时任务相比与实时任务,实现上多了哪些东西?

定时任务-前端

如图,创建定时任务的模板时,需要指定 cron 表达式与人群文件路径。

  • 程序提供了上传人群文件的接口:MessageTemplateController.upload,上传文件后会自动填充路径。

随后,cron 表达式与人群文件路径作为 MessageTemplate 的两个参数,存储进数据库中。

  • “cron 表达式”在创建 XXL-JOB 中的定时任务时,作为参数传入。
  • “人群文件路径”在定时任务执行时被读取。

定时任务执行的最后,调用 SendServiceImpl.batchSend,进入接入层,执行后续逻辑。

业务集成

需求:

  1. 创建定时任务模板:需要存储 cron 表达式与人群文件路径。
  2. 与 xxl-job 的集成:消息推送平台需要动态增删改定时任务,在代码内集成,而非可视化配置页面。所以自己封装 XXL-JOB 调度器的 HTTP 接口。
    • 实现方式:Hutool HTTP 模块的工具类,请求调度器暴露的 HTTP 接口。

时序图:

sequenceDiagram
    participant C   as 客户端
    participant MTC as MessageTemplateController
    participant MTS as MessageTemplateService
    participant CTS as CronTaskService
    participant XXL as xxl-job-admin

alt 创建定时任务模板。

    note over C: 上传人群文件
    C->>MTC: upload
    MTC-->>C: 返回人群文件路径<br>路径存储在消息模板中<br>后续通过路径读取文件

    note over C: 创建模板<br><br>任务类型为定时任务
    C->>MTC: saveOrUpdate
    MTC->>MTS: saveOrUpdate
    note over MTS: 将模板存入数据库<br><br>此时人群文件路径已存储在模板中<br><br>定时任务在后续第一次启动时被注册

end

alt 与 xxl-job 的集成。即通过代码操纵 xxl-job。

    note over C: 启动定时任务
    C->>MTC: start
    MTC->>MTS: startCronTask
    MTS->>CTS: saveCronTask
    note over CTS: 任务是否存在的判断依据<br>MessageTemplate.cronTaskId
    alt 任务不存在,创建任务
    CTS->>XXL: {$url}/jobinfo/add
    else 任务存在,更新任务
    CTS->>XXL: {$url}/jobinfo/update
    end
    MTS->>CTS: startCronTask
    CTS->>XXL: {$url}/jobinfo/start

    note over C: 暂停定时任务
    C->>MTC: stop
    MTC->>MTS: stopCronTask
    MTS->>CTS: stopCronTask
    CTS->>XXL: {$url}/jobinfo/stop

    note over C: 删除模板<br><br>顺便删除定时任务
    C->>MTC: deleteByIds
    MTC->>MTS: deleteByIds
    MTS->>CTS: deleteCronTask
    CTS->>XXL: {$url}/jobinfo/remove

end

deleteByIds 并没有真正从数据库中删除模板,而是将模板状态更新为“已被删除”,将更改保存进数据库。

自己的疑惑在于,并未看到系统对该数据后续有何处理。

  • 难道是直接通过数据库恢复已被删除的模板吗?

其他:

XxlJobInfo 是与 xxl-job-admin 接口进行交互所需的配置信息,封装为类,方便操作。

XxlJobInfo.scheduleConf 存储任务的调度配置,通常是 cron 表达式。

注:

xxl-job 的封装不够好,很多参数(例如任务调度类型(如 cron)、任务路由策略(如轮询))都需要手动封装为 JavaBean。xxl 包下的一些类做的就是这些事情。

(看看 xxl-job 新版本有没有支持更方便的 api 调用。这玩意不应该呀,好麻烦。)

(设置 cookie 之类的东西,然后就能通过 hutool 的 HttpUtil 直接调用接口,这基本就是所有网站的本质了,脚本也基于此而成!不忘初心(自动写作业脚本))

(还有,austin 中 xxl-job 的封装就相当于 HttpClient 的最佳实践吧。“配置”与“常量”的思想。)

具体执行逻辑

定时任务的具体执行逻辑。

点击启动后,会触发定时任务 CronTaskHandler

随后是三个线程池处理消息。

为何如此设计?

  1. 逐行读文件:加入文件中有 100w 个接收者,同时处理会内存爆满,显然不可能。
  2. 异步处理:读取磁盘文件并远程调用发送接口是一件比较耗时的工作,所以通过“线程池”异步处理。
  3. 批量处理:一条条处理显然也不合适。内存队列 LazyPending,通过判断队列中消息积压的 size 或 timeout,解决接口调用过少或过多的问题。

将整个流程比做物流运输链路。

  • 第一个线程池,相当于原材料供应商,每解析一行数据,生成一个“包裹”(CrowdInfoVo),通过“传送带”(阻塞队列)送往集货中心。
  • 第二个线程池,相当于物流公司的集货中心,定量||定时将零散包裹打包成箱。
  • 第三个线程池,相当于运输车队,将整箱货物运往目的地。

时序图

sequenceDiagram
    participant XXL as xxl-job-admin
    participant CTH as CronTaskHandler
    participant TH as TaskHandler
    participant ALP as AbstractLazyPending
    participant CBTP as CrowdBatchTaskPending
    participant SS as SendService

alt 异步打包任务,批量发送
    ALP->>ALP: initConsumePending<br>启动单线程池处理消费队列中的任务
    ALP->>ALP: handle
    ALP->>CBTP: doHandle<br>合并多个任务,构建参数,批量发送
    CBTP->>SS: batchSend
end

alt 生产任务
    note over XXL: 定时任务触发
    XXL->>CTH: execute<br>将任务提交至线程池异步执行
    note over CTH: 任务获取
    CTH->>TH: handle<br>获取消息模板,逐行读取接收者文件
    note over TH: 任务推送
    TH->>ALP: pending<br>线程池参数由Pending配置<br>将任务推送至阻塞队列
end

文字描述:

  • 处理多个定时任务,CronAsyncThreadPoolConfig.getXxlCronExecutor

    • 读取 CSV 文件,逐行解析接收者信息。
    • 将每行数据封装为 CrowdInfoVo 对象,提交至阻塞队列。
  • 消费单个定时任务的阻塞队列,SupportThreadPoolConfig.getPendingSingleThreadPool

    • 轮询阻塞队列:通过 poll 方法从阻塞队列取出任务,累积到 tasks 列表。
    • 触发消费条件:检查是否满足数量阈值或时间阈值。
    • 提交任务:当满足条件时,将 tasks 列表批量提交给第二个线程池异步处理。
    • 实际发送阻塞队列中的任务,CronAsyncThreadPoolConfig.getConsumePendingThreadPool
    • 合并请求,调用批量发送接口。

问题:如果有多个定时任务怎么办?(同时自己认为代码实现有问题)

三个线程池分别只有一个,但是 CrowdBatchTaskPending 是多例的,需要为每个定时任务创建一个对应的处理类,使每个定时任务拥有自己的阻塞队列与 tasks 列表。

但是,代码中的实现,线程池也被创建了多个。

第二个线程池有正常关闭的方法,第三个线程池没有找到,或许是bug?

线程池创建的初衷不就是为了避免线程的频繁创建销毁吗?

AI 说线程池属于重量级资源,应当常驻前台,与整个应用同周期。

个人认为应当将线程池设置为单例,处理类为原型模式没问题,同时应对多个定时任务,但是线程池应当只有一个!

其他

需要额外注意 Bean 的作用域为原型的类。

作用域为原型的类

其他

(线程池要不写简历上?反正 austin 项目中涉及到了,肯定会被问到。)

JavaGuide:线程池的一些知识点,如何创建?三个核心参数,阻塞队列。

黑马讲的还是不错的,手写线程池。

austin 调用的 hutool 的工具类,创建线程池。