简介
用xxl-job做后台任务管理, 主要是快速解决定时任务的HA问题, 项目代码量不大, 功能精简, 没有特殊依赖. 因为产品中用到了这个项目, 上午花了点时间研究了一下运行机制. 把看到的记一下.
- 项目地址
- 文档 https://www.xuxueli.com/xxl-job/
环境
<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>${最新稳定版本}</version> </dependency>
运行需要 JDK1.8, MySQL5.7
数据库结构
- 库编码 utf8mb4_unicode_ci
- Table: xxl_job_group
任务分组, 组名, 只支持一级分组, address_list 字段支持多个执行端地址, 逗号分隔 - Table: xxl_job_info
任务主表, 记录了任务明细, 调度明细以及预警设置 - Table: xxl_job_log
任务每次执行的日志 - Table: xxl_job_log_report
按日对执行日志进行统计的结果 - Table: xxl_job_logglue
- Table: xxl_job_registry
用于登记任务的执行者, 记录group:分组, key:名称, value:接口地址. 名称是可以重复的, 接口地址会添加到任务分组表中的注册字段 - Table: xxl_job_user
简单的登录控制, 与其它表没有关联 - Table: xxl_job_lock
单字段表, 用于并发时加锁避免冲突
代码结构
- 项目用到的都是常见组件, MyBatis, FreeMarker, Bootstrap, 当前版本基于SpringBoot 2.6.7
- 线上运行的是 xxl-job-admin 模块, 提供执行端注册, 任务发起和日志记录等服务
- 项目中需要实现 xxl-job-executor, 项目中提供了例子
项目文件结构如下
├───doc │ ├───db # 初始化的sql │ └───images ├───xxl-job-admin # 运行的服务端模块, 提供界面和调度 │ └───src │ ├───main │ │ ├───java │ │ │ └───com │ │ │ └───xxl │ │ │ └───job │ │ │ └───admin │ │ │ ├───controller │ │ │ │ ├───annotation │ │ │ │ ├───interceptor │ │ │ │ └───resolver │ │ │ ├───core │ │ │ ├───dao │ │ │ └───service │ │ │ └───impl │ │ └───resources │ │ ├───i18n # 多国化, 简繁英 │ │ ├───mybatis-mapper # xml形式的mapper │ │ ├───static # 前端静态文件 │ │ └───templates # Freemarker模板 │ └───test │ └───java │ ├───xxl-job-core # 公用jar包, 模块内部依赖 │ └───src │ └───main │ └───java │ └───xxl-job-executor-samples ├───xxl-job-executor-sample-frameless # 任务执行层示例 │ └───src │ ├───main │ │ ├───java │ │ └───resources │ └───test │ └───java └───xxl-job-executor-sample-springboot # 使用SpringBoot的执行层示例 └───src ├───main │ ├───java │ └───resources └───test
运行机制
执行端需要准备以下信息
-
adminAddresses 服务端地址, 例如 http://127.0.0.1:8080/xxl-job-admin
-
accessToken 貌似是服务端的token, 在调用服务端 api/registry, api/registryRemove 等操作时需要验证
-
appname 执行端名称
-
address 执行端地址, 和 ip:port 二选一, 存在则覆盖 ip:port
-
ip 执行端IP
-
port 执行端服务端口
-
执行端启动后将自己注册到服务端, 等待回调
-
任务执行通过 XxlJobTrigger.processTrigger() 发起, 准备参数, 并在分组中选择一个地址
-
根据这个地址取得 ExecutorBiz, 调用 executorBiz.run() 执行任务
-
服务端: 通过 ExecutorBizClient,
- 调用
XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
- 其中
accessToken
是服务端的accessToken
- 调用
-
执行端: 通过
ExecutorBizImpl.run()
- 调用
XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
得到XxlJob方法 - 通过
XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason)
执行
- 调用
非 Spring 的场景
通过调用 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 这个方法将 XxlJobSimpleExecutor 实例化, 并注册到xxl_job服务端
Spring 场景
- 在
@Configuration
中, 将 XxlJobSpringExecutor 作为一个@Bean
添加到 Spring context - XxlJobSpringExecutor 是 XxlJobExecutor 的子类并实现了
SmartInitializingSingleton
接口的afterSingletonsInstantiated()
方法 - 在
afterSingletonsInstantiated()
方法中- 调用 initJobHandlerMethodRepository(), 在这个方法中, 找到所有
@XxlJob
注解的方法 - 通过
registJobHandler()
, 将@XxlJob
方法添加到private static ConcurrentMap<String, IJobHandler> jobHandlerRepository
- 调用
XxlJobExecutor.start()
, 将自己注册到 xxl_job 服务端
- 调用 initJobHandlerMethodRepository(), 在这个方法中, 找到所有
远程调用服务
xxl_job 并未使用Spring的服务机制, 而是内部实现了一个侦听指定IP+端口的服务. 这个实现对应的类是 EmbedServer, 服务基于 Netty, 核心代码是
// start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true);
这行代码注册了内部的XxlJob方法
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)
处理远程请求时, 在下面的代码中, 通过executorBiz.run(triggerParam)
调用XxlJob方法
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { //... // services mapping try { switch (uri) { case "/beat": return executorBiz.beat(); case "/idleBeat": IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); case "/run": TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); case "/kill": KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); case "/log": LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); default: return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found."); } } catch (Exception e) { //... }
锁机制
通过select ... for update
实现的, 这个表并没有放到 MyBatis, 在 JobScheduleHelper 中, 通过
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute();
得到锁, 在方法末尾释放
// close PreparedStatement if (null != preparedStatement) { try { preparedStatement.close(); } catch (SQLException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } }