.Net Core对于`RabbitMQ`封装分布式事件总线

首先我们需要了解到分布式事件总线是什么;

分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

然后了解一下RabbitMQ

RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  1. RabbitMQ的主要功能包括:
    1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
    2. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
    3. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
    4. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
    5. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件

实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

<ItemGroup> 	<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />     <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />     <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />     <PackageReference Include="RabbitMQ.Client" Version="6.4.0" /> </ItemGroup> 

创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

EventsBusOptions.cs

namespace EventsBus.Contract;  public class EventsBusOptions {     /// <summary>     /// 接收时异常事件     /// </summary>     public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent; } 

IEventsBusHandle.cs

namespace EventsBus.Contract;  public interface IEventsBusHandle<in TEto> where TEto : class {     Task HandleAsync(TEto eventData); } 

ILoadEventBus.cs

namespace EventsBus.Contract;  public interface ILoadEventBus {     /// <summary>     /// 发布事件     /// </summary>     /// <param name="eto"></param>     /// <typeparam name="TEto"></typeparam>     /// <returns></returns>     Task PushAsync<TEto>(TEto eto) where TEto : class; } 

EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

namespace EventsBus.RabbitMQ;  [AttributeUsage(AttributeTargets.Class)] public class EventsBusAttribute : Attribute {     public readonly string Name;      public EventsBusAttribute(string name)     {         Name = name;     } } 

然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.ContractRabbitMQ实现

创建项目完成以后分别创建ExtensionsEventsBusRabbitMQExtensions.cs,OptionsRabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

ExtensionsEventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

using EventsBus.Contract; using EventsBus.RabbitMQ; using EventsBus.RabbitMQ.Options; using Microsoft.Extensions.Configuration;  namespace Microsoft.Extensions.DependencyInjection;  public static class EventsBusRabbitMQExtensions {     public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,         IConfiguration configuration)     {         services.AddSingleton<RabbitMQFactory>();         services.AddSingleton(typeof(RabbitMQEventsManage<>));         services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));         services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();                  return services;     } } 

OptionsRabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

using RabbitMQ.Client;  namespace EventsBus.RabbitMQ.Options;  public class RabbitMQOptions {     /// <summary>     /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>     /// 指示应使用的协议的缺省值。     /// </summary>     public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;      /// <summary>     /// 地址     /// </summary>     public string HostName { get; set; }      /// <summary>     /// 账号     /// </summary>     public string UserName { get; set; }      /// <summary>     /// 密码     /// </summary>     public string Password { get; set; } } 

RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

using System.Reflection; using System.Text.Json; using EventsBus.Contract; using Microsoft.Extensions.DependencyInjection; using RabbitMQ.Client; using RabbitMQ.Client.Events;  namespace EventsBus.RabbitMQ;  public class RabbitMQEventsManage<TEto> where TEto : class {     private readonly IServiceProvider _serviceProvider;     private readonly RabbitMQFactory _rabbitMqFactory;      public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)     {         _serviceProvider = serviceProvider;         _rabbitMqFactory = rabbitMqFactory;         _ = Task.Run(Start);     }      private void Start()     {         var channel = _rabbitMqFactory.CreateRabbitMQ();         var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();         var name = eventBus?.Name ?? typeof(TEto).Name;         channel.QueueDeclare(name, false, false, false, null);         var consumer = new EventingBasicConsumer(channel); //消费者         channel.BasicConsume(name, true, consumer); //消费消息         consumer.Received += async (model, ea) =>         {             var bytes = ea.Body.ToArray();             try             {                 // 这样就可以实现多个订阅                 var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();                 foreach (var handle in events)                 {                     await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));                 }             }             catch (Exception e)             {                 EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);             }         };     } } 

RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

using EventsBus.RabbitMQ.Options; using Microsoft.Extensions.Options; using RabbitMQ.Client;  namespace EventsBus.RabbitMQ;  public class RabbitMQFactory : IDisposable {     private readonly RabbitMQOptions _options;     private readonly ConnectionFactory _factory;     private IConnection? _connection;      public RabbitMQFactory(IOptions<RabbitMQOptions> options)     {         _options = options?.Value;         // 将Options中的参数添加到ConnectionFactory         _factory = new ConnectionFactory         {             HostName = _options.HostName,             UserName = _options.UserName,             Password = _options.Password,             Port = _options.Port         };     }      public IModel CreateRabbitMQ()     {         // 当第一次创建RabbitMQ的时候进行链接         _connection ??= _factory.CreateConnection();          return _connection.CreateModel();     }      public void Dispose()     {         _connection?.Dispose();     } } 

RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

using System.Reflection; using System.Text.Json; using EventsBus.Contract; using Microsoft.Extensions.DependencyInjection;  namespace EventsBus.RabbitMQ;  public class RabbitMQLoadEventBus : ILoadEventBus {     private readonly IServiceProvider _serviceProvider;     private readonly RabbitMQFactory _rabbitMqFactory;      public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)     {         _serviceProvider = serviceProvider;         _rabbitMqFactory = rabbitMqFactory;     }      public async Task PushAsync<TEto>(TEto eto) where TEto : class     {          //创建一个通道         //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue         using var channel = _rabbitMqFactory.CreateRabbitMQ();                  // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称         var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();         var name = eventBus?.Name ?? typeof(TEto).Name;                  // 使用获取的名称创建一个通道         channel.QueueDeclare(name, false, false, false, null);         var properties = channel.CreateBasicProperties();         properties.DeliveryMode = 1;         // 将数据序列号,然后发布         channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息         // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,         var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();                  await Task.CompletedTask;     } } 

在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

然后我们需要使用RabbitMQ分布式事件总线工具包

使用RabbitMQ分布式事件总线的示例

首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose启动一个RabbitMQ,下面提供一个compose文件

version: '3.1' services:   rabbitmq:     restart: always # 开机自启     image: rabbitmq:3.11-management # RabbitMQ使用的镜像     container_name: rabbitmq # docker名称     hostname: rabbit     ports:       - 5672:5672 # 只是RabbitMQ SDK使用的端口       - 15672:15672 # 这是RabbitMQ管理界面使用的端口     environment:       TZ: Asia/Shanghai # 设置RabbitMQ时区       RABBITMQ_DEFAULT_USER: token # rabbitMQ账号       RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密码     volumes:       - ./data:/var/lib/rabbitmq 

启动以后我们创建一个WebApi项目,项目名称Demo,创建完成打开项目文件添加引用

<Project Sdk="Microsoft.NET.Sdk.Web">      <PropertyGroup>         <TargetFramework>net7.0</TargetFramework>         <Nullable>enable</Nullable>         <ImplicitUsings>enable</ImplicitUsings>     </PropertyGroup>      <ItemGroup>         <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" />         <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />     </ItemGroup>      <ItemGroup>         <!-- 引用RabbitMQ事件总线项目-->         <ProjectReference Include="..EventsBus.RabbitMQEventsBus.RabbitMQ.csproj" />     </ItemGroup>  </Project> 

修改appsettings.json配置文件:将RabbitMQ的配置写上,RabbitMQOptions名称对应在EventsBus.RabbitMQ中的RabbitMQOptions文件![image-20230211022801105].Net Core对于`RabbitMQ`封装分布式事件总线

在这里注入的时候将配置注入好了

{   "Logging": {     "LogLevel": {       "Default": "Information",       "Microsoft.AspNetCore": "Warning"     }   },   "AllowedHosts": "*",   "RabbitMQOptions": {     "HostName": "127.0.0.1",     "UserName": "token",     "Password": "dd666666"   } }  

创建DemoEto.cs文件:

using EventsBus.RabbitMQ;  namespace Demo;  [EventsBus("Demo")] public class DemoEto {     public int Size { get; set; }          public string Value { get; set; } } 

创建DemoEventsBusHandle.cs文件:这里是订阅DemoEto事件,相当于是DemoEto的处理程序

using System.Text.Json; using EventsBus.Contract;  namespace Demo;  /// <summary> /// 事件处理服务,相当于订阅事件 /// </summary> public class DemoEventsBusHandle : IEventsBusHandle<DemoEto> {     public async Task HandleAsync(DemoEto eventData)     {         Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");         await Task.CompletedTask;     } } 

打开Program.cs 修改代码: 在这里注入了事件总线服务,和我们的事件处理服务

using Demo; using EventsBus.Contract;  var builder = WebApplication.CreateBuilder(args);  builder.Services.AddControllers();  builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen();  // 注入事件处理服务 builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));  // 注入RabbitMQ服务 builder.Services.AddEventsBusRabbitMQ(builder.Configuration);  var app = builder.Build();  // 只有在Development显示Swagger if (app.Environment.IsDevelopment()) {     app.UseSwagger();     app.UseSwaggerUI(); }  // 强制Https app.UseHttpsRedirection();  app.UseAuthorization();  app.MapControllers();  app.Run(); 

创建ControllersEventBusController.cs控制器:我们在控制器中注入了ILoadEventBus ,通过调用接口实现发布事件;

using EventsBus.Contract; using Microsoft.AspNetCore.Mvc;  namespace Demo.Controllers;  [ApiController] [Route("[controller]")] public class EventBusController : ControllerBase {     private readonly ILoadEventBus _loadEventBus;      public EventBusController(ILoadEventBus loadEventBus)     {         _loadEventBus = loadEventBus;     }      /// <summary>     /// 发送信息     /// </summary>     /// <param name="eto"></param>     [HttpPost]     public async Task Send(DemoEto eto)     {         await _loadEventBus.PushAsync(eto);     } } 

然后我们启动程序会打开Swagger调试界面:

.Net Core对于`RabbitMQ`封装分布式事件总线

然后我们发送一下事件:

.Net Core对于`RabbitMQ`封装分布式事件总线

我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现,

来自Token的分享

技术交流群:737776595

发表评论

相关文章