什么是状态机
状态机作为一种程序开发范例,在实际的应用开发中有很多的应用场景,其中.NET 中的async/await 的核心底层实现就是基于状态机机制。状态机分为两种:有限状态机和无限状态机,本文介绍的就是有限状态机,有限状态机在任何时候都可以准确地处于有限状态中的一种,其可以根据一些输入从一个状态转换到另一个状态。一个有限状态机是由其状态列表、初始状态和触发每个转换的输入来定义的。如下图展示的就是一个闸机的状态机示意图:
从上图可以看出,状态机主要有以下核心概念:
- State:状态,闸机有已开启(opened)和已关闭(closed)状态。
- Transition:转移,即闸机从一个状态转移到另一个状态的过程。
- Transition Condition:转移条件,也可理解为事件,即闸机在某一状态下只有触发了某个转移条件,才会执行状态转移。比如,闸机处于已关闭状态时,只有接收到开启事件才会执行转移动作,进而转移到开启状态。
- Action:动作,即完成状态转移要执行的动作。比如要从关闭状态转移到开启状态,则需要执行开闸动作。
在.NET中,dotnet-state-machine/stateless
和MassTransit
都提供了开箱即用的状态机实现。本文将重点介绍MassTransit
中的状态机在Saga 模式中的应用。
MassTransit StateMachine
在MassTransit 中MassTransitStateMachine
就是状态机的具体抽象,可以用其编排一系列事件来实现状态的流转,也可以用来实现Saga模式的分布式事务。并支持与EF Core和Dapper集成将状态持久化到关系型数据库,也支持将状态持久化到MongoDB、Redis等数据库。是以简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示。
基于状态机实现编排式Saga事务
那具体如何使用MassTransitStateMachine
来应用编排式Saga 模式呢,接下来就来创建解决方案来实现以上下单流程示例。依次创建以下项目,除共享类库项目外,均安装MassTransit
和MassTransit.RabbitMQ
NuGet包。
项目 | 项目名 | 项目类型 |
---|---|---|
订单服务 | MassTransit.SmDemo.OrderService | ASP.NET Core Web API |
库存服务 | MassTransit.SmDemo.InventoryService | Worker Service |
支付服务 | MassTransit.SmDemo.PaymentService | Worker Service |
共享类库 | MassTransit.SmDemo.Shared | Class Library |
三个服务都添加扩展类MassTransitServiceExtensions
,并在Program.cs
类中调用services.AddMassTransitWithRabbitMq();
注册服务。
using System.Reflection; using MassTransit.CourierDemo.Shared.Models; namespace MassTransit.CourierDemo.InventoryService; public static class MassTransitServiceExtensions { public static IServiceCollection AddMassTransitWithRabbitMq(this IServiceCollection services) { return services.AddMassTransit(x => { x.SetKebabCaseEndpointNameFormatter(); // By default, sagas are in-memory, but should be changed to a durable // saga repository. x.SetInMemorySagaRepositoryProvider(); var entryAssembly = Assembly.GetEntryAssembly(); x.AddConsumers(entryAssembly); x.AddSagaStateMachines(entryAssembly); x.AddSagas(entryAssembly); x.AddActivities(entryAssembly); x.UsingRabbitMq((context, busConfig) => { busConfig.Host( host: "localhost", port: 5672, virtualHost: "masstransit", configure: hostConfig => { hostConfig.Username("guest"); hostConfig.Password("guest"); }); busConfig.ConfigureEndpoints(context); }); }); } }
订单服务
订单服务作为下单流程中的核心服务,主要职责包含接收创建订单请求和订单状态机的实现。先来定义OrderController
如下:
namespace MassTransit.SmDemo.OrderService.Controllers; [ApiController] [Route("[controller]")] public class OrderController : ControllerBase { private readonly IBus _bus; public OrderController(IBus bus) { _bus = bus; } [HttpPost] public async Task<IActionResult> CreateOrder(CreateOrderDto createOrderDto) { await _bus.Publish<ICreateOrderCommand>(new { createOrderDto.CustomerId, createOrderDto.ShoppingCartItems }); return Ok(); } }
紧接着,订阅ICreateOrderCommand
,执行订单创建逻辑,订单创建完毕后会发布ICreateOrderSucceed
事件。
public class CreateOrderConsumer : IConsumer<ICreateOrderCommand> { private readonly ILogger<CreateOrderConsumer> _logger; public CreateOrderConsumer(ILogger<CreateOrderConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<ICreateOrderCommand> context) { var shoppingItems = context.Message.ShoppingCartItems.Select(item => new ShoppingCartItem(item.SkuId, item.Price, item.Qty)); var order = new Order(context.Message.CustomerId).NewOrder(shoppingItems.ToArray()); await OrderRepository.Insert(order); _logger.LogInformation($"Order {order.OrderId} created successfully"); await context.Publish<ICreateOrderSucceed>(new { order.OrderId, order.OrderItems }); } }
最后来实现订单状态机,主要包含以下几步:
- 定义状态机状态: 一个状态机从启动到结束可能会经历各种异常,包括程序异常或物理故障,为确保状态机能从异常中恢复,因此必须保存状态机的状态。本例中,定义
OrderState
以保存状态机实例状态数据:
using MassTransit.SmDemo.OrderService.Domains; namespace MassTransit.SmDemo.OrderService; public class OrderState : SagaStateMachineInstance { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public Guid OrderId { get; set; } public decimal Amount { get; set; } public List<OrderItem> OrderItems { get; set; } }
- 定义状态机:直接继承自
MassTransitStateMachine
并同时指定状态实例即可:
namespace MassTransit.SmDemo.OrderService; public class OrderStateMachine : MassTransitStateMachine<OrderState> { }
- 注册状态机:这里指定内存持久化方式来持久化状态,也可指定诸如MongoDb、MySQL等数据库进行状态持久化:
return services.AddMassTransit(x => { //... x.AddSagaStateMachine<OrderStateMachine, OrderState>() .InMemoryRepository(); }
- 定义状态列表:即状态机涉及到的系列状态,并通过
State
类型定义,本例中为:- 已创建:
public State Created { get; private set; }
- 库存已扣减:
public State InventoryDeducted { get; private set; }
- 已支付:
public State Paid { get; private set; }
- 已取消:
public State Canceled { get; private set; }
- 已创建:
- 定义转移条件:即推动状态流转的事件,通过
Event<T>
类型定义,本例涉及有:- 订单成功创建事件:
public Event<ICreateOrderSucceed> OrderCreated {get; private set;}
- 库存扣减成功事件:
public Event<IDeduceInventorySucceed> DeduceInventorySucceed {get; private set;}
- 库存扣减失败事件:
public Event<IDeduceInventoryFailed> DeduceInventoryFailed {get; private set;}
- 订单支付成功事件:
public Event<IPayOrderSucceed> PayOrderSucceed {get; private set;}
- 订单支付失败事件:
public Event<IPayOrderFailed> PayOrderFailed {get; private set;}
- 库存已返还事件:
public Event<IReturnInventorySucceed> ReturnInventorySucceed { get; private set; }
- 订单取消事件:
public Event<ICancelOrderSucceed> OrderCanceled { get; private set; }
- 订单成功创建事件:
- 定义关联关系:由于每个事件都是孤立的,但相关联的事件终会作用到某个具体的状态机实例上,如何关联事件以推动状态机的转移呢?配置
关联Id
。以下就是将事件消息中的传递的OrderId
作为关联ID。Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId));
- 定义状态转移:即状态在什么条件下做怎样的动作完成状态的转移,本例中涉及的正向状态转移有:
(1) 初始状态->已创建:触发条件为OrderCreated
事件,同时要发送IDeduceInventoryCommand
推动库存服务执行库存扣减。
Initially( When(OrderCreated) .Then(context => { context.Saga.OrderId = context.Message.OrderId; context.Saga.OrderItems = context.Message.OrderItems; context.Saga.Amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty); }) .PublishAsync(context => context.Init<IDeduceInventoryCommand>(new { context.Saga.OrderId, DeduceInventoryItems = context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList() })) .TransitionTo(Created));
(2) 已创建-> 库存已扣减:触发条件为DeduceInventorySucceed
事件,同时要发送IPayOrderCommand
推动支付服务执行订单支付。
During(Created, When(DeduceInventorySucceed) .Then(context => { context.Publish<IPayOrderCommand>(new { context.Saga.OrderId, context.Saga.Amount }); }).TransitionTo(InventoryDeducted), When(DeduceInventoryFailed).Then(context => { context.Publish<ICancelOrderCommand>(new { context.Saga.OrderId }); }) );
(3) 库存已扣减->已支付:触发条件为PayOrderSucceed
事件,转移到已支付后,流程结束。
During(InventoryDeducted, When(PayOrderFailed).Then(context => { context.Publish<IReturnInventoryCommand>(new { context.Message.OrderId, ReturnInventoryItems = context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList() }); }), When(PayOrderSucceed).TransitionTo(Paid).Then(context => context.SetCompleted()));
最终完整版的OrderStateMachine
如下所示:
using MassTransit.SmDemo.OrderService.Events; using MassTransit.SmDemo.Shared.Contracts; namespace MassTransit.SmDemo.OrderService; public class OrderStateMachine : MassTransitStateMachine<OrderState> { public State Created { get; private set; } public State InventoryDeducted { get; private set; } public State Paid { get; private set; } public State Canceled { get; private set; } public Event<ICreateOrderSucceed> OrderCreated { get; private set; } public Event<IDeduceInventorySucceed> DeduceInventorySucceed { get; private set; } public Event<IDeduceInventoryFailed> DeduceInventoryFailed { get; private set; } public Event<ICancelOrderSucceed> OrderCanceled { get; private set; } public Event<IPayOrderSucceed> PayOrderSucceed { get; private set; } public Event<IPayOrderFailed> PayOrderFailed { get; private set; } public Event<IReturnInventorySucceed> ReturnInventorySucceed { get; private set; } public Event<IOrderStateRequest> OrderStateRequested { get; private set; } public OrderStateMachine() { Event(() => OrderCreated, x => x.CorrelateById(m => m.Message.OrderId)); Event(() => DeduceInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId)); Event(() => DeduceInventoryFailed, x => x.CorrelateById(m => m.Message.OrderId)); Event(() => ReturnInventorySucceed, x => x.CorrelateById(m => m.Message.OrderId)); Event(() => PayOrderSucceed, x => x.CorrelateById(m => m.Message.OrderId)); Event(() => PayOrderFailed, x => x.CorrelateById(m => m.Message.OrderId)); Event(() => OrderCanceled, x => x.CorrelateById(m => m.Message.OrderId)); Event(() => OrderStateRequested, x => { x.CorrelateById(m => m.Message.OrderId); x.OnMissingInstance(m => { return m.ExecuteAsync(x => x.RespondAsync<IOrderNotFoundOrCompleted>(new { x.Message.OrderId })); }); }); InstanceState(x => x.CurrentState); Initially( When(OrderCreated) .Then(context => { context.Saga.OrderId = context.Message.OrderId; context.Saga.OrderItems = context.Message.OrderItems; var amount = context.Message.OrderItems.Sum(x => x.Price * x.Qty); context.Saga.Amount = amount; }) .PublishAsync(context => context.Init<IDeduceInventoryCommand>(new { context.Saga.OrderId, DeduceInventoryItems = context.Saga.OrderItems.Select(x => new DeduceInventoryItem(x.SkuId, x.Qty)).ToList() })) .TransitionTo(Created)); During(Created, When(DeduceInventorySucceed) .Then(context => { context.Publish<IPayOrderCommand>(new { context.Saga.OrderId, context.Saga.Amount }); }).TransitionTo(InventoryDeducted), When(DeduceInventoryFailed).Then(context => { context.Publish<ICancelOrderCommand>(new { context.Saga.OrderId }); }) ); During(InventoryDeducted, When(PayOrderFailed).Then(context => { context.Publish<IReturnInventoryCommand>(new { context.Message.OrderId, ReturnInventoryItems = context.Saga.OrderItems.Select(x => new ReturnInventoryItem(x.SkuId, x.Qty)).ToList() }); }), When(PayOrderSucceed).TransitionTo(Paid).Then(context => context.SetCompleted()), When(ReturnInventorySucceed) .ThenAsync(context => context.Publish<ICancelOrderCommand>(new { context.Saga.OrderId })).TransitionTo(Created)); DuringAny(When(OrderCanceled).TransitionTo(Canceled).ThenAsync(async context => { await Task.Delay(TimeSpan.FromSeconds(10)); await context.SetCompleted(); })); DuringAny( When(OrderStateRequested) .RespondAsync(x => x.Init<IOrderStateResponse>(new { x.Saga.OrderId, State = x.Saga.CurrentState })) ); } }
库存服务
库存服务在整个下单流程的职责主要是库存的扣减和返还,其仅需要订阅IDeduceInventoryCommand
和IReturnInventoryCommand
两个命令并实现即可。代码如下所示:
using MassTransit.SmDemo.InventoryService.Repositories; using MassTransit.SmDemo.Shared.Contracts; namespace MassTransit.SmDemo.InventoryService.Consumers; public class DeduceInventoryConsumer : IConsumer<IDeduceInventoryCommand> { private readonly ILogger<DeduceInventoryConsumer> _logger; public DeduceInventoryConsumer(ILogger<DeduceInventoryConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<IDeduceInventoryCommand> context) { if (!CheckStock(context.Message.DeduceInventoryItems)) { _logger.LogWarning($"Insufficient stock for order [{context.Message.OrderId}]!"); await context.Publish<IDeduceInventoryFailed>( new { context.Message.OrderId, Reason = "insufficient stock" }); } else { _logger.LogInformation($"Inventory has been deducted for order [{context.Message.OrderId}]!"); DeduceStocks(context.Message.DeduceInventoryItems); await context.Publish<IDeduceInventorySucceed>(new { context.Message.OrderId }); } } private bool CheckStock(List<DeduceInventoryItem> deduceItems) { foreach (var stockItem in deduceItems) { if (InventoryRepository.GetStock(stockItem.SkuId) < stockItem.Qty) return false; } return true; } private void DeduceStocks(List<DeduceInventoryItem> deduceItems) { foreach (var stockItem in deduceItems) { InventoryRepository.TryDeduceStock(stockItem.SkuId, stockItem.Qty); } } }
namespace MassTransit.SmDemo.InventoryService.Consumers; public class ReturnInventoryConsumer : IConsumer<IReturnInventoryCommand> { private readonly ILogger<ReturnInventoryConsumer> _logger; public ReturnInventoryConsumer(ILogger<ReturnInventoryConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<IReturnInventoryCommand> context) { foreach (var returnInventoryItem in context.Message.ReturnInventoryItems) { InventoryRepository.ReturnStock(returnInventoryItem.SkuId, returnInventoryItem.Qty); } _logger.LogInformation($"Inventory has been returned for order [{context.Message.OrderId}]!"); await context.Publish<IReturnInventorySucceed>(new { context.Message.OrderId }); } }
支付服务
对于下单流程的支付用例来说,要么成功要么失败,因此仅需要订阅IPayOrderCommand
命令即可,具体PayOrderConsumer
实现如下:
using MassTransit.SmDemo.Shared.Contracts; namespace MassTransit.SmDemo.PaymentService.Consumers; public class PayOrderConsumer : IConsumer<IPayOrderCommand> { private readonly ILogger<PayOrderConsumer> _logger; public PayOrderConsumer(ILogger<PayOrderConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<IPayOrderCommand> context) { await Task.Delay(TimeSpan.FromSeconds(10)); if (context.Message.Amount % 2 == 0) {_logger.LogInformation($"Order [{context.Message.OrderId}] paid successfully!"); await context.Publish<IPayOrderSucceed>(new { context.Message.OrderId }); } else { _logger.LogWarning($"Order [{context.Message.OrderId}] payment failed!"); await context.Publish<IPayOrderFailed>(new { context.Message.OrderId, Reason = "Insufficient account balance" }); } } }
运行结果
启动三个项目,并在Swagger中发起订单创建请求,如下图所示:
由于订单总额为奇数,因此支付会失败,最终控制台输出如下图所示:
打开RabbitMQ后台,可以看见MassTransit按照约定创建了以下队列用于服务间的消息传递:
其中order-state
队列绑定到类型为fanout
的同名order-state
Exchange,其绑定关系如下图所示,该Exchange负责从其他同名事件的Exchange转发事件。
总结
通过以上示例的讲解,相信了解到MassTransit StateMachine的强大之处。StateMachine充当着事务编排器的角色,通过集中定义状态、转移条件和状态转移的执行顺序,实现高内聚的事务流转控制,也确保了其他伴生服务仅需关注自己的业务逻辑,而无需关心事务的流转,真正实现了关注点分离。