?【作者】科技、互联网行业优质创作者
【专注领域】.Net技术、软件架构、人工智能、数字化转型、DeveloperSharp、微服务、工业互联网、智能制造
点击右上方“关注”,里面有很多高价值技术文章,是你刻苦努力也积累不到的经验,能助你快速成长。升职+涨薪!!
前言
在很多.NET 开发体系中开发者在面对调度作业需求的时候一般会选择三方开源成熟的作业调度框架来满足业务需求,比如Hangfire、Quartz.NET这样的框架。
但是有些时候可能我们只是需要一个简易的延迟任务,这个时候引入这些框架就费力不讨好了。
最简单的粗暴的办法当然是:
Task.Run(async?()?=>
{
??//延迟xx毫秒
??await?Task.Delay(time);
??//业务执行
});
当时作为一个开发者,有时候还是希望使用更优雅的、可复用的一体化方案,比如可以实现一个简易的时间轮来完成基于内存的非核心重要业务的延迟调度。什么是时间轮呢,其实就是一个环形数组,每一个数组有一个插槽代表对应时刻的任务,数组的值是一个任务队列,假设我们有一个基于60秒的延迟时间轮,也就是说我们的任务会在不超过60秒(超过的情况增加分钟插槽,下面会讲)的情况下执行,那么如何实现?
正文
下面我们将定义一段代码来实现这个简单的需求。
话不多说,撸代码,首先我们需要定义一个时间轮的Model类用于承载我们的延迟任务和任务处理器。简单定义如下:
public?class?WheelTask
{
??public?T?Data?{?get;?set;?}
??public?Func?Handle?{?get;?set;?}
}
定义很简单,就是一个入参T代表要执行的任务所需要的入参,然后就是任务的具体处理器Handle。
接着我们来定义时间轮本轮的核心代码:
可以看到时间轮其实核心就两个东西,一个是毫秒计时器,一个是数组插槽,这里数组插槽我们使用了字典来实现,key值分别对应0到59秒。每一个插槽的value对应一个任务队列。
当添加一个新任务的时候,输入需要延迟的秒数,就会将任务插入到延迟多少秒对应的插槽内,当计时器启动的时候,每一跳刚好1秒,那么就会对插槽计数+1,然后去寻找当前插槽是否有任务,有的话就会调用ExecuteTask执行该插槽下的所有任务。
public?class?TimeWheel
{
??int?secondSlot?=?0;
??DateTime?wheelTime?{?get?{?return?new?DateTime(1,?1,?1,?0,?0,?secondSlot);?}?}
??Dictionary>>?secondTaskQueue;
??public?void?Start()
??{
????new?Timer(Callback,?null,?0,?1000);
????secondTaskQueue?=?new?Dictionary>>();
????Enumerable.Range(0,?60).ToList().ForEach(x?=>
????{
??????secondTaskQueue.Add(x,?new?ConcurrentQueue>());
????});
??}
??public?async?Task?AddTaskAsync(int?second,?T?data,?Func?handler)
??{
????var?handTime?=?wheelTime.AddSeconds(second);
????if?(handTime.Second?!=?wheelTime.Second)
??????secondTaskQueue[handTime.Second].Enqueue(new?WheelTask(data,?handler));
????else
??????await?handler(data);
??}
??async?void?Callback(object?o)
??{
????if?(secondSlot?!=?59)
??????secondSlot++;
????else
????{
??????secondSlot?=?0;
????}
????if?(secondTaskQueue[secondSlot].Any())
??????await?ExecuteTask();
??}
??async?Task?ExecuteTask()
??{
????if?(secondTaskQueue[secondSlot].Any())
??????while?(secondTaskQueue[secondSlot].Any())
????????if?(secondTaskQueue[secondSlot].TryDequeue(out?WheelTask?task))
??????????await?task.Handle(task.Data);
??}
}
接下来就是如果我需要大于60秒的情况如何处理呢。其实就是增加分钟插槽数组,举个例子我有一个任务需要2分40秒后执行,那么当我插入到时间轮的时候我先插入到分钟插槽,当计时器每过去60秒,分钟插槽值+1,当分钟插槽对应有任务的时候就将这些任务从分钟插槽里弹出再入队到秒插槽中,这样一个任务会先进入插槽值=2(假设从0开始计算)的分钟插槽,计时器运行120秒后分钟值从0累加到2,2插槽的任务弹出到插槽值=40的秒插槽里,当计时器再运行40秒,刚好就可以执行这个延迟2分40秒的任务。
话不多说,上代码:
首先我们将任务WheelTask增加一个Second属性,用于当任务从分钟插槽弹出来时需要知道自己入队哪个秒插槽
public?class?WheelTask
{
??...
??public?int?Second?{?get;?set;}
??...
}
接着我们再重新定义时间轮的逻辑增加分钟插槽值以及插槽队列的部分
public?class?TimeWheel
{
??int?minuteSlot,?secondSlot?=?0;
??DateTime?wheelTime?{?get?{?return?new?DateTime(1,?1,?1,?0,?minuteSlot,?secondSlot);?}?}
??Dictionary>>?minuteTaskQueue,?secondTaskQueue;
??public?void?Start()
??{
????new?Timer(Callback,?null,?0,?1000);、
????minuteTaskQueue?=?new?Dictionary>>();
????secondTaskQueue?=?new?Dictionary>>();
????Enumerable.Range(0,?60).ToList().ForEach(x?=>
????{
??????minuteTaskQueue.Add(x,?new?ConcurrentQueue>());
??????secondTaskQueue.Add(x,?new?ConcurrentQueue>());
????});
??}
??...
}
同样的在添加任务的AddTaskAsync函数中我们需要增加分钟,代码改为这样,当大于1分钟的任务会入队到分钟插槽中,小于1分钟的会按原逻辑直接入队到秒插槽中:
public?async?Task?AddTaskAsync(int?minute,?int?second,?T?data,?Func?handler)
{
??var?handTime?=?wheelTime.AddMinutes(minute).AddSeconds(second);
????if?(handTime.Minute?!=?wheelTime.Minute)
??????minuteTaskQueue[handTime.Minute].Enqueue(new?WheelTask(handTime.Second,?data,?handler));
????else
????{
??????if?(handTime.Second?!=?wheelTime.Second)
????????secondTaskQueue[handTime.Second].Enqueue(new?WheelTask(data,?handler));
??????else
????????await?handler(data);
????}
}
最后的部分就是计时器的callback以及任务执行的部分:
async?void?Callback(object?o)
{
??bool?minuteExecuteTask?=?false;
??if?(secondSlot?!=?59)
????secondSlot++;
??else
??{
????secondSlot?=?0;
????minuteExecuteTask?=?true;
????if?(minuteSlot?!=?59)
??????minuteSlot++;
????else
????{
??????minuteSlot?=?0;
????}
??}
??if?(minuteExecuteTask?||?secondTaskQueue[secondSlot].Any())
????await?ExecuteTask(minuteExecuteTask);
}
async?Task?ExecuteTask(bool?minuteExecuteTask)
{
??if?(minuteExecuteTask)
????while?(minuteTaskQueue[minuteSlot].Any())
??????if?(minuteTaskQueue[minuteSlot].TryDequeue(out?WheelTask?task))
????????secondTaskQueue[task.Second].Enqueue(task);
??if?(secondTaskQueue[secondSlot].Any())
????while?(secondTaskQueue[secondSlot].Any())
??????if?(secondTaskQueue[secondSlot].TryDequeue(out?WheelTask?task))
????????await?task.Handle(task.Data);
}
总结
基本上基于分钟+秒的时间轮延迟任务核心功能就这些了,聪明的你一定知道如何扩展增加小时,天,月份甚至年份的时间轮了。
虽然从代码逻辑上可以实现,但是大部分情况下我们使用时间轮仅仅是完成一些内存易失性的非核心的任务延迟调度,实现天,周,月年意义不是很大。所以基本上到小时就差不多了。再多就上作业系统来调度吧。
写在最后
- ?请点击上方“关注”我,里面有很多高价值技术文章,是你刻苦努力也积累不到的经验,能助你快速成长。升职+涨薪!!
- 关注【数字智慧化基地】(微信?扫描?下方?二维码?),免费领取如下?15个?视频?教程?!
?回复'面试',获取C#/.NET/.NET Core面试宝典
回复'C#',领取零基础学习C#编程
回复'NET',领取.NET零基础入门到实战
回复'Linux',领取Linux从入门到精通
回复'WPF',领取高薪热门【WPF上位机+工业互联网】从零手写实战
回复'Modbus',领取初识C#+上位机Modbus通信
回复'PLC',领取C#语言与西门子PLC的通信实操
回复'blazor',领取blazor从入门到实战
回复'TypeScript',领取前端热门TypeScript系统教程
回复'vue',领取vue前端从入门到精通
回复'23P',领取C#实现23种常见设计模式
回复'MongoDB',领取MongoDB实战
回复'Trans',领取分布式事务
回复'Lock',领取分布式锁实践
回复'Docker',领取微服务+Docker综合实战
回复'K8s',领取K8s部署微服务
回复'加群',进.NET技术社区交流群