.Net之延迟队列

介绍

具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

使用场景

延时队列在项目中的应用还是比较多的,尤其像电商类平台:

  1. 订单成功后,在30分钟内没有支付,自动取消订单
  2. 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
  3. 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
  4. 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

该介绍来自其他文章

方案

下面的例子没有进行封装,所以代码仅供参考

Redis过期事件

注意:

不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大

不保证一定送达

发送即忘策略,不包含持久化

但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。

redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。

配置

需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

.Net之延迟队列

-- 取消注释 notify-keyspace-events Ex  -- 注释 #notify-keyspace-events "" 

然后重新启动服务器,比如windows

 .redis-server.exe  .redis.windows.conf 

或者linux中使用docker-compose重新部署redis

  redis:     container_name: redis     image: redis     hostname: redis     restart: always     ports:        - "6379:6379"     volumes:        - $PWD/redis/redis.conf:/etc/redis.conf       - /root/common-docker-compose/redis/data:/data     command:        /bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件 

然后使用客户端订阅事件

-- windows .redis-cli   -- linux docker exec -it 容器标识 redis-cli   psubscribe __keyevent@0__:expired 

控制台订阅

使用StackExchange.Redis组件订阅过期事件

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection); var db = connectionMultiplexer.GetDatabase(0);  db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10)); Console.WriteLine("开始订阅");  var subscriber = connectionMultiplexer.GetSubscriber();  //订阅库0的过期通知事件 subscriber.Subscribe("__keyevent@0__:expired", (channel, key) => {     Console.WriteLine($"key过期 channel:{channel} key:{key}"); });  Console.ReadLine(); 

输出结果:

key过期 channel:keyevent@0:expired key:orderno:123456

如果启动多个客户端监听,那么多个客户端都可以收到过期事件。

WebApi中订阅

创建RedisListenService继承自:BackgroundService

public class RedisListenService : BackgroundService {     private readonly ISubscriber _subscriber;      public RedisListenService(IServiceScopeFactory serviceScopeFactory)     {         using var scope = serviceScopeFactory.CreateScope();         var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();          var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);         var db = connectionMultiplexer.GetDatabase(0);         _subscriber = connectionMultiplexer.GetSubscriber();     }      protected override Task ExecuteAsync(CancellationToken stoppingToken)     {         //订阅库0的过期通知事件         _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>         {             Console.WriteLine($"key过期 channel:{channel} key:{key}");         });          return Task.CompletedTask;     } } 

注册该后台服务

services.AddHostedService<RedisListenService>(); 

启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。

RabbitMq延迟队列

版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的

插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine 

进入容器

docker exec -it 容器名称/标识 bash 

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange 

查看是否启用

rabbitmq-plugins list 

[E]和[e]表示启用,然后重启服务

rabbitmq-server restart 

然后在管理界面添加交换机都可以看到

.Net之延迟队列

生产消息

发送的消息类型是:x-delayed-message

[HttpGet("send/delay")] public string SendDelayedMessage() {     var factory = new ConnectionFactory()     {         HostName = "localhost",//IP地址         Port = 5672,//端口号         UserName = "admin",//用户账号         Password = "123456",//用户密码         VirtualHost = "customer"     };     using var connection = factory.CreateConnection();     using var channel = connection.CreateModel();      var exchangeName = "delay-exchange";     var routingkey = "delay.delay";     var queueName = "delay_queueName";      //设置Exchange队列类型     var argMaps = new Dictionary<string, object>()     {         {"x-delayed-type", "topic"}     };     //设置当前消息为延时队列     channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);     channel.QueueDeclare(queueName, true, false, false, argMaps);     channel.QueueBind(queueName, exchangeName, routingkey);      var time = 1000 * 5;     var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";     var body = Encoding.UTF8.GetBytes(message);     var props = channel.CreateBasicProperties();     //设置消息的过期时间     props.Headers = new Dictionary<string, object>()             {                 {  "x-delay", time }             };     channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);     Console.WriteLine("成功发送消息:" + message);      return "success"; } 

消费消息

消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理

public class RabbitmqDelayedHostService : BackgroundService {     private readonly IModel _channel;     private readonly IConnection _connection;      public RabbitmqDelayedHostService()     {         var connFactory = new ConnectionFactory//创建连接工厂对象         {             HostName = "localhost",//IP地址             Port = 5672,//端口号             UserName = "admin",//用户账号             Password = "123456",//用户密码             VirtualHost = "customer"         };         _connection = connFactory.CreateConnection();         _channel = _connection.CreateModel();          //交换机名称         var exchangeName = "exchangeDelayed";         var queueName = "delay_queueName";         var routingkey = "delay.delay";         var argMaps = new Dictionary<string, object>()         {             {"x-delayed-type", "topic"}         };         _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);         _channel.QueueDeclare(queueName, true, false, false, argMaps);         _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);         //声明为手动确认         _channel.BasicQos(0, 1, false);     }      protected override Task ExecuteAsync(CancellationToken stoppingToken)     {         var queueName = "delay_queueName";          var consumer = new EventingBasicConsumer(_channel);         consumer.Received += (model, ea) =>         {             var message = Encoding.UTF8.GetString(ea.Body.ToArray());             var routingKey = ea.RoutingKey;             Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");              //手动确认             _channel.BasicAck(ea.DeliveryTag, true);         };         _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);          return Task.CompletedTask;     }      public override void Dispose()     {         _connection.Dispose();         _channel.Dispose();         base.Dispose();     } } 

注册该后台任务

services.AddHostedService<RabbitmqDelayedHostService>(); 

输出结果

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000

其他方案

  • Hangfire延迟队列
BackgroundJob.Schedule(   () => Console.WriteLine("Delayed!"),    TimeSpan.FromDays(7)); 
  • 时间轮
  • Redisson DelayQueue
  • 计时管理器
发表评论

相关文章