首先我们需要了解到分布式事件总线是什么;
分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。
例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。
然后了解一下RabbitMQ
RabbitMQ
是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ
通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。
- RabbitMQ的主要功能包括:
- 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
- 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
- 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
- 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
- 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。
本文将讲解使用RabbitMQ实现分布式事件
实现我们创建一个EventsBus.Contract
的类库项目,用于提供基本接口,以支持其他实现
在项目中添加以下依赖引用,并且记得添加EventsBus.Contract
项目引用
<ItemGroup> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" /> <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" /> <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" /> <PackageReference Include="RabbitMQ.Client" Version="6.4.0" /> </ItemGroup>
创建项目完成以后分别创建EventsBusOptions.cs
,IEventsBusHandle.cs
,RabbitMQEventsManage.cs
,ILoadEventBus.cs
,提供我们的分布式事件基本接口定义
EventsBusOptions.cs
:
namespace EventsBus.Contract; public class EventsBusOptions { /// <summary> /// 接收时异常事件 /// </summary> public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent; }
IEventsBusHandle.cs
:
namespace EventsBus.Contract; public interface IEventsBusHandle<in TEto> where TEto : class { Task HandleAsync(TEto eventData); }
ILoadEventBus.cs
:
namespace EventsBus.Contract; public interface ILoadEventBus { /// <summary> /// 发布事件 /// </summary> /// <param name="eto"></param> /// <typeparam name="TEto"></typeparam> /// <returns></returns> Task PushAsync<TEto>(TEto eto) where TEto : class; }
EventsBusAttribute.cs
:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ
的通道
namespace EventsBus.RabbitMQ; [AttributeUsage(AttributeTargets.Class)] public class EventsBusAttribute : Attribute { public readonly string Name; public EventsBusAttribute(string name) { Name = name; } }
然后可以创建我们的RabbitMQ
实现了,创建EventsBus.RabbitMQ
类库项目,用于编写EventsBus.Contract
的RabbitMQ
实现
创建项目完成以后分别创建ExtensionsEventsBusRabbitMQExtensions.cs
,OptionsRabbitMQOptions.cs
,EventsBusAttribute.cs
,,RabbitMQFactory.cs
,RabbitMQLoadEventBus.cs
ExtensionsEventsBusRabbitMQExtensions.cs
:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection
,这样就在注入的时候减少过度使用命名空间了
using EventsBus.Contract; using EventsBus.RabbitMQ; using EventsBus.RabbitMQ.Options; using Microsoft.Extensions.Configuration; namespace Microsoft.Extensions.DependencyInjection; public static class EventsBusRabbitMQExtensions { public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services, IConfiguration configuration) { services.AddSingleton<RabbitMQFactory>(); services.AddSingleton(typeof(RabbitMQEventsManage<>)); services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions))); services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>(); return services; } }
OptionsRabbitMQOptions.cs
:提供基本的Options
读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
的方法是读取IConfiguration
的名称为RabbitMQOptions
的配置东西,映射到Options中,具体使用往下看。
using RabbitMQ.Client; namespace EventsBus.RabbitMQ.Options; public class RabbitMQOptions { /// <summary> /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/> /// 指示应使用的协议的缺省值。 /// </summary> public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort; /// <summary> /// 地址 /// </summary> public string HostName { get; set; } /// <summary> /// 账号 /// </summary> public string UserName { get; set; } /// <summary> /// 密码 /// </summary> public string Password { get; set; } }
RabbitMQEventsManage.cs
:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序
using System.Reflection; using System.Text.Json; using EventsBus.Contract; using Microsoft.Extensions.DependencyInjection; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace EventsBus.RabbitMQ; public class RabbitMQEventsManage<TEto> where TEto : class { private readonly IServiceProvider _serviceProvider; private readonly RabbitMQFactory _rabbitMqFactory; public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory) { _serviceProvider = serviceProvider; _rabbitMqFactory = rabbitMqFactory; _ = Task.Run(Start); } private void Start() { var channel = _rabbitMqFactory.CreateRabbitMQ(); var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>(); var name = eventBus?.Name ?? typeof(TEto).Name; channel.QueueDeclare(name, false, false, false, null); var consumer = new EventingBasicConsumer(channel); //消费者 channel.BasicConsume(name, true, consumer); //消费消息 consumer.Received += async (model, ea) => { var bytes = ea.Body.ToArray(); try { // 这样就可以实现多个订阅 var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>(); foreach (var handle in events) { await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes)); } } catch (Exception e) { EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes); } }; } }
RabbitMQFactory.cs
:提供RabbitMQ
链接工厂,在这里你可以自己去定义和管理RabbitMQ
工厂
using EventsBus.RabbitMQ.Options; using Microsoft.Extensions.Options; using RabbitMQ.Client; namespace EventsBus.RabbitMQ; public class RabbitMQFactory : IDisposable { private readonly RabbitMQOptions _options; private readonly ConnectionFactory _factory; private IConnection? _connection; public RabbitMQFactory(IOptions<RabbitMQOptions> options) { _options = options?.Value; // 将Options中的参数添加到ConnectionFactory _factory = new ConnectionFactory { HostName = _options.HostName, UserName = _options.UserName, Password = _options.Password, Port = _options.Port }; } public IModel CreateRabbitMQ() { // 当第一次创建RabbitMQ的时候进行链接 _connection ??= _factory.CreateConnection(); return _connection.CreateModel(); } public void Dispose() { _connection?.Dispose(); } }
RabbitMQLoadEventBus.cs
:用于实现ILoadEventBus.cs
通过ILoadEventBus
发布事件RabbitMQLoadEventBus.cs
是RabbitMQ的实现
using System.Reflection; using System.Text.Json; using EventsBus.Contract; using Microsoft.Extensions.DependencyInjection; namespace EventsBus.RabbitMQ; public class RabbitMQLoadEventBus : ILoadEventBus { private readonly IServiceProvider _serviceProvider; private readonly RabbitMQFactory _rabbitMqFactory; public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory) { _serviceProvider = serviceProvider; _rabbitMqFactory = rabbitMqFactory; } public async Task PushAsync<TEto>(TEto eto) where TEto : class { //创建一个通道 //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue using var channel = _rabbitMqFactory.CreateRabbitMQ(); // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称 var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>(); var name = eventBus?.Name ?? typeof(TEto).Name; // 使用获取的名称创建一个通道 channel.QueueDeclare(name, false, false, false, null); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; // 将数据序列号,然后发布 channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息 // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效, var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>(); await Task.CompletedTask; } }
在这里我们的RabbitMQ
分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;
然后我们需要使用RabbitMQ分布式事件总线工具包
使用RabbitMQ分布式事件总线的示例
首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose
启动一个RabbitMQ
,下面提供一个compose文件
version: '3.1' services: rabbitmq: restart: always # 开机自启 image: rabbitmq:3.11-management # RabbitMQ使用的镜像 container_name: rabbitmq # docker名称 hostname: rabbit ports: - 5672:5672 # 只是RabbitMQ SDK使用的端口 - 15672:15672 # 这是RabbitMQ管理界面使用的端口 environment: TZ: Asia/Shanghai # 设置RabbitMQ时区 RABBITMQ_DEFAULT_USER: token # rabbitMQ账号 RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密码 volumes: - ./data:/var/lib/rabbitmq
启动以后我们创建一个WebApi
项目,项目名称Demo
,创建完成打开项目文件添加引用
<Project Sdk="Microsoft.NET.Sdk.Web"> <PropertyGroup> <TargetFramework>net7.0</TargetFramework> <Nullable>enable</Nullable> <ImplicitUsings>enable</ImplicitUsings> </PropertyGroup> <ItemGroup> <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" /> <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" /> </ItemGroup> <ItemGroup> <!-- 引用RabbitMQ事件总线项目--> <ProjectReference Include="..EventsBus.RabbitMQEventsBus.RabbitMQ.csproj" /> </ItemGroup> </Project>
修改appsettings.json
配置文件:将RabbitMQ的配置写上,RabbitMQOptions
名称对应在EventsBus.RabbitMQ
中的RabbitMQOptions
文件![image-20230211022801105]
在这里注入的时候将配置注入好了
{ "Logging": { "LogLevel": { "Default": "Information", "Microsoft.AspNetCore": "Warning" } }, "AllowedHosts": "*", "RabbitMQOptions": { "HostName": "127.0.0.1", "UserName": "token", "Password": "dd666666" } }
创建DemoEto.cs
文件:
using EventsBus.RabbitMQ; namespace Demo; [EventsBus("Demo")] public class DemoEto { public int Size { get; set; } public string Value { get; set; } }
创建DemoEventsBusHandle.cs
文件:这里是订阅DemoEto
事件,相当于是DemoEto
的处理程序
using System.Text.Json; using EventsBus.Contract; namespace Demo; /// <summary> /// 事件处理服务,相当于订阅事件 /// </summary> public class DemoEventsBusHandle : IEventsBusHandle<DemoEto> { public async Task HandleAsync(DemoEto eventData) { Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}"); await Task.CompletedTask; } }
打开Program.cs
修改代码: 在这里注入了事件总线服务,和我们的事件处理服务
using Demo; using EventsBus.Contract; var builder = WebApplication.CreateBuilder(args); builder.Services.AddControllers(); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); // 注入事件处理服务 builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle)); // 注入RabbitMQ服务 builder.Services.AddEventsBusRabbitMQ(builder.Configuration); var app = builder.Build(); // 只有在Development显示Swagger if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } // 强制Https app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers(); app.Run();
创建ControllersEventBusController.cs
控制器:我们在控制器中注入了ILoadEventBus
,通过调用接口实现发布事件;
using EventsBus.Contract; using Microsoft.AspNetCore.Mvc; namespace Demo.Controllers; [ApiController] [Route("[controller]")] public class EventBusController : ControllerBase { private readonly ILoadEventBus _loadEventBus; public EventBusController(ILoadEventBus loadEventBus) { _loadEventBus = loadEventBus; } /// <summary> /// 发送信息 /// </summary> /// <param name="eto"></param> [HttpPost] public async Task Send(DemoEto eto) { await _loadEventBus.PushAsync(eto); } }
然后我们启动程序会打开Swagger
调试界面:
然后我们发送一下事件:
我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现,
来自Token的分享
技术交流群:737776595