通过surging的后台托管服务编写任务调度并支持规则引擎自定义脚本

简介

     过去,如果在业务中需要处理任务调度的时候,大家都会使用第三方的任务调度组件,而第三方组件有一套自己的规则,在微服务的中显得那么格格不入,这样就会造成代码臃肿,耦合性高,如果有分布式还需要搭建新的分布式环境,如果把任务调度做成组件服务,这个就完全满足了微服务的模块化,组件化,而下面谈的是在surging 中如何支持规则引擎自定义脚本。

调度频率设置

       首先在开始之前,先看看如何通过脚本分配多种调度计划,先看下表:

方法 描述
EveryMinute() 每分钟执行一次任务
EveryFiveMinutes(); 每五分钟执行一次任务
EveryTenMinutes();  每十分钟执行一次任务
EveryThirtyMinutes() 每半小时执行一次任务
Hourly(); 每小时执行一次任务
HourlyAt(10) 每一个小时的第 10 分钟运行一次
Daily() 每到午夜执行一次任务
DailyAt("3:00") 在 3:00 执行一次任务
TwiceDaily(1, 3) 在 1:00 和 3:00 分别执行一次任务
Weekly() 每周执行一次任务
Monthly() 每月执行一次任务
MonthlyOn(4, "3:00") 在每个月的第四天的 3:00 执行一次任务
Quarterly() 每季度执行一次任务
Yearly() 每年执行一次任务
Timezone("utc") 设置utc时区

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

举个例子,在工作日每三秒在时间8:00-23:30内执行任务。脚本如下:

parser.TimeZone(""utc"")       .Weekdays()
.SecondAt(3)
.Between(""8:00"", ""23:30"")

 额外的限制条件列表如下:

方法 描述
Weekdays() 限制任务在工作日
Sundays() 限制任务在星期日
Mondays() 限制任务在星期一
Tuesdays() 限制任务在星期二
Wednesdays() 限制任务在星期三
Thursdays() 限制任务在星期四
Fridays() 限制任务在星期五
Saturdays() 限制任务在星期六
When( function(lastExecTime)) 限制任务基于一个script脚本返回为真的验证
Skip( function(lastExecTime)) 限制任务基于一个script脚本返回为假的验证

 

 

 

 

 

 

 

 

 

 

 

 

举个例子,在工作日每三秒在时间8:00-23:30内执行任务。如果设置When返回为true,skip返回false 就会执行,脚本如下:

parser.TimeZone(""utc"")        .When(function(lastExecTime){                 return true;             })        .Skip(              function(lastExecTime){                 return false;             })       .Weekdays()       .SecondAt(3)        .Between(""8:00"", ""23:30"")

然后在function 脚本中支持DateUtils对象,可以针对lastExecTime进行逻辑判断,比如是否是周末:DateUtils.IsWeekend(lastExecTime), 是否是今天DateUtils.IsToday(lastExecTime),代码如下:

 

parser.TimeZone(""utc"")        .When(function(lastExecTime){                return DateUtils.IsToday(lastExecTime);             })        .Skip(              function(lastExecTime){                 return DateUtils.IsWeekend(lastExecTime);             })       .Weekdays()       .SecondAt(3)        .Between(""8:00"", ""23:30"")

 

编写调度服务

surging微服务引擎是支持后台管理托管服务的,如果要基于BackgroundService编写任务调度,那服务就要继承BackgroundServiceBehavior,还要继承ISingleInstance以设置注入单例模式,

首先,创建接口服务,这样就可以远程添加任务,开启关闭服务了,代码如下:

   [ServiceBundle("Background/{Service}")]     public interface IWorkService : IServiceKey     {         Task<bool> AddWork(Message message);           Task StartAsync();          Task StopAsync();     }

然后创建业务领域服务,以下代码是通过规则引擎自定义脚本设置执行频率,并且可以设置execsize 以标识同时执行任务的大小,通过以下业务逻辑代码大家可以扩展支持持久化。

public class WorkService : BackgroundServiceBehavior, IWorkService, ISingleInstance     {         private readonly ILogger<WorkService> _logger;         private readonly Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>> _queue = new Queue<Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>>();         private readonly ConcurrentDictionary<string, DateTime> _keyValuePairs = new ConcurrentDictionary<string, DateTime>();         private readonly IServiceProxyProvider _serviceProxyProvider;         private AtomicLong _atomic=new AtomicLong(1);         private const int EXECSIZE = 1;         private CancellationToken _token;          public WorkService(ILogger<WorkService> logger, IServiceProxyProvider serviceProxyProvider)         {             _logger = logger;             _serviceProxyProvider = serviceProxyProvider;             /*   var script = @"parser                                .Weekdays().SecondAt(3).Between(""8:00"", ""22:00"")";*/             var script = @"parser                               .TimeZone(""utc"")                                .When(                               function(lastExecTime){                 return DateUtils.IsToday(lastExecTime);             }).Skip(              function(lastExecTime){                 return DateUtils.IsWeekend(lastExecTime);             }).Weekdays().SecondAt(3).Between(""8:00"", ""23:30"")";             var ruleWorkflow = GetSchedulerRuleWorkflow(script);             var messageId = Guid.NewGuid().ToString();             _keyValuePairs.AddOrUpdate(messageId, DateTime.Now, (key, value) => DateTime.Now);             _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(new Message() { MessageId= messageId,Config=new SchedulerConfig() {  IsPersistence=true} }, GetRuleEngine(ruleWorkflow), ruleWorkflow));          }          public  Task<bool> AddWork(Message message)         {             var ruleWorkflow = GetSchedulerRuleWorkflow(message.Config.Script);             _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);             _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, GetRuleEngine(ruleWorkflow), ruleWorkflow));             return Task.FromResult(true);         }          protected override async  Task ExecuteAsync(CancellationToken stoppingToken)         {             try             {                 _token = stoppingToken;                 _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);                  _queue.TryDequeue(out Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>? message);                 if (message != null)                 {                     var parser = await GetParser(message.Item3, message.Item2);                     await PayloadSubscribe(parser, message.Item1, message.Item2, message.Item3);                     _keyValuePairs.TryGetValue(message.Item1.MessageId, out DateTime dateTime);                     parser.Build(dateTime == DateTime.MinValue ? DateTime.Now : dateTime);                 }                 if (!_token.IsCancellationRequested && (message == null || _atomic.GetAndAdd(1) == EXECSIZE))                 {                     _atomic = new AtomicLong(1);                     await Task.Delay(1000, stoppingToken);                  }             }             catch (Exception ex){                 _logger.LogError("WorkService execute error, message:{message} ,trace info:{trace} ", ex.Message, ex.StackTrace);             }         }          public async Task StartAsync()         {             if (_token.IsCancellationRequested)             {                  await base.StartAsync(_token);             }         }          public async Task StopAsync()         {             if (!_token.IsCancellationRequested)             {                await  base.StopAsync(_token);             }         }          private async Task PayloadSubscribe(RulePipePayloadParser parser, Message message, RulesEngine.RulesEngine rulesEngine, SchedulerRuleWorkflow ruleWorkflow)         {             parser.HandlePayload().Subscribe(async (temperature) =>             {                 try                 {                     if (temperature)                     {                        await  ExecuteByPlanAsyn(message);                         _logger.LogInformation("Worker exec at: {time}", DateTimeOffset.Now);                      }                 }                 catch (Exception ex) { }                 finally                 {                     if (message.Config.IsPersistence || (!temperature && !message.Config.IsPersistence))                         _queue.Enqueue(new Tuple<Message, RulesEngine.RulesEngine, SchedulerRuleWorkflow>(message, rulesEngine, ruleWorkflow));                  }             });         }          private async Task<bool> ExecuteByPlanAsyn(Message message)         {             var result = false;             var isExec = true;             try             {                 if (!string.IsNullOrEmpty(message.RoutePath))                 {                     var serviceResult = await _serviceProxyProvider.Invoke<object>(message.Parameters, message.RoutePath, message.ServiceKey);                     bool.TryParse(serviceResult?.ToString(), out result);                     isExec = true;                 }             }             catch { }             finally             {                 if (isExec && message.Config.IsPersistence)                     _keyValuePairs.AddOrUpdate(message.MessageId, DateTime.Now, (key, value) => DateTime.Now);                 else if (!message.Config.IsPersistence)                     _keyValuePairs.TryRemove(message.MessageId, out DateTime dateTime);             }             return result;         }          private async Task<RulePipePayloadParser> GetParser(SchedulerRuleWorkflow ruleWorkflow, RulesEngine.RulesEngine engine)         {             var payloadParser = new RulePipePayloadParser();             var ruleResult = await engine.ExecuteActionWorkflowAsync(ruleWorkflow.WorkflowName, ruleWorkflow.RuleName, new RuleParameter[] { new RuleParameter("parser", payloadParser) });             if (ruleResult.Exception != null && _logger.IsEnabled(LogLevel.Error))                 _logger.LogError(ruleResult.Exception, ruleResult.Exception.Message);             return payloadParser;         }          private RulesEngine.RulesEngine GetRuleEngine(SchedulerRuleWorkflow ruleWorkFlow)         {             var reSettingsWithCustomTypes = new ReSettings { CustomTypes = new Type[] { typeof(RulePipePayloadParser) } };             var result = new RulesEngine.RulesEngine(new Workflow[] { ruleWorkFlow.GetWorkflow() }, null, reSettingsWithCustomTypes);             return result;         }          private SchedulerRuleWorkflow GetSchedulerRuleWorkflow(string script)         {             var result = new SchedulerRuleWorkflow("1==1");             if (!string.IsNullOrEmpty(script))             {                 result = new SchedulerRuleWorkflow(script);             }             return result;         }     }

总结

因为工作繁忙,微服务平台暂时搁置,等公司基于surging 的物联网项目上线后,再投入时间研发,surging 一直开发中未曾放弃,也许你没看到的版本才是最强大的。之前的QQ群被封了,如果感兴趣可以加:744677125

开源地址:https://github.com/fanliang11/surging

 

发表评论

相关文章