EntityFramework Core并发迁移解决方案

场景

目前一个项目中数据持久化采用EF Core + MySQL,使用CodeFirst模式开发,并且对数据进行了分库,按照目前颗粒度分完之后,大概有一两百个库,每个库的数据都是相互隔离的。
借鉴了Github上一个开源的仓库 arch/UnitOfWork 实现UnitOfWork,核心操作就是每个api请求的时候带上库名,在执行CRUD之前先将DbContext切换到目标数据库,我们在切换数据库的时候加了一些操作,如检查数据库是否已创建、检查连接是否可用、判断是否需要 表结构迁移

/// <summary> /// 切换数据库 这要求数据库在同一台机器上 注意:这只适用于MySQL。 /// </summary> /// <param name="database">目标数据库</param> public void ChangeDatabase(string database) {     // 检查连接     ......      // 检查数据库是否创建     ......      var connection = _context.Database.GetDbConnection();     if (connection.State.HasFlag(ConnectionState.Open))     {         connection.ChangeDatabase(database);     }     else     {         var connectionString = Regex.Replace(connection.ConnectionString.Replace(" ", ""), @"(?<=[Dd]atabase=)w+(?=;)", database, RegexOptions.Singleline);         connection.ConnectionString = connectionString;     }      // 判断是否需要执行表结构迁移     if(_context..Database.GetPendingMigrations().Any())     {         //自定义的迁移的一些逻辑         _context.Database.Migrate(_context);     } }        

但是当多个操作同时对一个库进行Migrate的时候,就会出现问题,比如“新增一张表”的操作已经被第一个迁移执行过了,第二个执行的迁移并不知道已经执行过了Migrate,就会报错表已存在。
于是考虑在执行Migrate的时候,加入一个锁的机制,对当前数据库执行Migrate之前先获取锁,然后再来决定接下来的操作。由于这边有的服务无法访问Redis,这里使用数据库来实现锁的机制,当然用Redis来实现更好,加入锁的机制只是一种解决问题的思路。

利用数据库实现迁移锁

1. 新增 MigrationLocks 表来实现迁移锁

  • 锁的操作不依赖DbContext实例
  • 在执行Migrate之前,尝试获取一个锁,在获取锁之前,如果表不存在则创建
    CREATE TABLE IF NOT EXISTS MigrationLocks (     LockName VARCHAR(255) PRIMARY KEY,     LockedAt DATETIME NOT NULL ); 
  • 成功往表中插入一条记录,视为获取锁成功,主键为需要迁移的库的名称
    INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW()); 
  • 迁移完成后,删除这条记录,视为释放锁成功;
    DELETE FROM MigrationLocks WHERE LockName = @database; 
  • 为防止 “死锁” 发生,每次尝试获取锁之前,会对锁的状态进行检查,释放超过5分钟的锁(正常来说,上一个迁移的执行时间不会超过5分钟)。
    SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE; 

2. 封装一下MigrateLock的实现

/// <summary> /// 迁移锁 /// </summary> public interface IMigrateLock {     /// <summary>     /// 尝试获取锁     /// </summary>     /// <param name="connection"></param>     /// <returns></returns>     bool TryAcquireLock(IDbConnection connection);      /// <summary>     /// 尝试获取锁     /// </summary>     /// <param name="connection"></param>     /// <returns></returns>     Task<bool> TryAcquireLockAsync(IDbConnection connection);      /// <summary>     /// 释放锁     /// </summary>     void ReleaseLock(IDbConnection connection);      /// <summary>     /// 释放锁     /// </summary>     /// <returns></returns>     Task ReleaseLockAsync(IDbConnection connection); }  /// <summary> /// 迁移锁 /// </summary> public class MigrateLock : IMigrateLock {     private readonly ILogger<MigrateLock> _logger;      public MigrateLock(ILogger<MigrateLock> logger)     {         _logger = logger;     }      private const string CreateTableSql = @"         CREATE TABLE IF NOT EXISTS MigrationLocks (             LockName VARCHAR(255) PRIMARY KEY,             LockedAt DATETIME NOT NULL         );";      private const string CheckLockedSql = "SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;";      private const string AcquireLockSql = "INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());";      private const string ReleaseLockSql = "DELETE FROM MigrationLocks WHERE LockName = @database;";      /// <summary>     /// 尝试获取锁     /// </summary>     /// <param name="connection"></param>     /// <returns></returns>     public bool TryAcquireLock(IDbConnection connection)     {         try           {             CheckLocked(connection);              var result = connection.Execute(AcquireLockSql, new { database = connection.Database });             if (result == 1)             {                 _logger.LogInformation("Lock acquired: {LockName}", connection.Database);                  return true;             }              _logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database);              return false;         }         catch (Exception ex)         {             if (ex.Message.StartsWith("Duplicate"))             {                 _logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database);             }             else             {                 _logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database);             }              return false;         }     }      /// <summary>     /// 尝试获取锁     /// </summary>     /// <param name="connection"></param>     /// <returns></returns>     public async Task<bool> TryAcquireLockAsync(IDbConnection connection)     {         try         {             await CheckLockedAsync(connection);              var result = await connection.ExecuteAsync(AcquireLockSql, new { database = connection.Database });             if (result == 1)             {                 _logger.LogInformation("Lock acquired: {LockName}", connection.Database);                  return true;             }              _logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database);              return false;         }         catch (Exception ex)         {             if (ex.Message.StartsWith("Duplicate"))             {                 _logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database);             }             else             {                 _logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database);             }              return false;         }     }      /// <summary>     /// 释放锁     /// </summary>     public void ReleaseLock(IDbConnection connection)     {         try         {             connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database });             _logger.LogInformation("Lock released: {LockName}", connection.Database);         }         catch (Exception ex)         {             _logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database);         }     }      /// <summary>     /// 释放锁     /// </summary>     public async Task ReleaseLockAsync(IDbConnection connection)     {         try         {             await connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database });             _logger.LogInformation("Lock released: {LockName}", connection.Database);         }         catch (Exception ex)         {             _logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database);         }     }      /// <summary>     /// 检查锁     /// </summary>     private void CheckLocked(IDbConnection connection)     {         connection.Execute(CreateTableSql);           var databaseParam = new         {             database = connection.Database         };          var lockExists = connection.QueryFirstOrDefault<int>(CheckLockedSql, databaseParam);         if (lockExists <= 0)         {             return;         }          _logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");         connection.Execute(ReleaseLockSql, databaseParam);     }      /// <summary>     /// 检查锁     /// </summary>     private async Task CheckLockedAsync(IDbConnection connection)     {         await connection.ExecuteAsync(CreateTableSql);                   var databaseParam = new         {             database = connection.Database         };          var lockExists = await connection.QueryFirstOrDefaultAsync<int>(CheckLockedSql, databaseParam);         if (lockExists <= 0)         {             return;         }          _logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock.");         await connection.ExecuteAsync(ReleaseLockSql, databaseParam);     } } 

3. 封装一下MigrateExecutor的实现

/// <summary> /// 数据库迁移执行器 /// </summary> public interface IMigrateExcutor {     /// <summary>     /// 执行迁移     /// </summary>     /// <param name="dbContext"></param>     void Migrate(DbContext dbContext);      /// <summary>     /// 执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <returns></returns>     Task MigrateAsync(DbContext dbContext);      /// <summary>     /// 并发场景执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     void ConcurrentMigrate(DbContext dbContext, bool wait = true);           /// <summary>     /// 并发场景执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     /// <returns></returns>     Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true);      /// <summary>     /// 并发场景执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <param name="connection"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true);      /// <summary>     /// 并发场景执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <param name="connection"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true); }  /// <summary> /// 数据库迁移执行器 /// </summary> public class MigrateExcutor : IMigrateExcutor {     private readonly IMigrateLock _migrateLock;     private readonly ILogger<MigrateExcutor> _logger;      public MigrateExcutor(         IMigrateLock migrateLock,         ILogger<MigrateExcutor> logger)     {         _migrateLock = migrateLock;         _logger = logger;     }      /// <summary>     /// 执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <returns></returns>     public void Migrate(DbContext dbContext)     {         try         {             if (dbContext.Database.GetPendingMigrations().Any())             {                 dbContext.Database.Migrate();             }         }         catch (Exception e)         {             _logger.LogError(e, "Migration failed");              HandleError(dbContext, e);         }     }      /// <summary>     /// 执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <returns></returns>     public async Task MigrateAsync(DbContext dbContext)     {         try         {             if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())             {                 await dbContext.Database.MigrateAsync();             }         }         catch (Exception e)         {             _logger.LogError(e, "Migration failed");              await HandleErrorAsync(dbContext, e);         }     }      /// <summary>     /// 并发场景执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     /// <returns></returns>     public void ConcurrentMigrate(DbContext dbContext, bool wait = true)     {         if (!dbContext.Database.GetPendingMigrations().Any())         {             return;         }          using var connection = MySqlConnectionHelper.CreateConnection(dbContext.Database.GetDbConnection().Database);          ConcurrentMigrate(dbContext, connection, wait);     }      /// <summary>     /// 并发场景执行迁移     /// </summary>     /// <param name="dbContext"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     /// <returns></returns>     public async Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true)     {         if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())         {             return;         }          await using var connection = await MySqlConnectionHelper.CreateConnectionAsync(dbContext.Database.GetDbConnection().Database);          await ConcurrentMigrateAsync(dbContext, connection, wait);     }      /// <summary>     /// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成)     /// </summary>     /// <param name="dbContext"></param>     /// <param name="connection"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     public void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true)     {         if (!dbContext.Database.GetPendingMigrations().Any())         {             return;         }          while (true)         {             if (_migrateLock.TryAcquireLock(connection))             {                 try                 {                     Migrate(dbContext);                      break;                 }                 finally                 {                     _migrateLock.ReleaseLock(connection);                 }             }              if (wait)             {                 _logger.LogWarning("Migration is locked, wait for 2 seconds");                 Thread.Sleep(20000);                  continue;             }              _logger.LogInformation("Migration is locked, skip");         }     }      /// <summary>     /// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成)     /// </summary>     /// <param name="dbContext"></param>     /// <param name="connection"></param>     /// <param name="wait">是否等待至正在进行中的迁移完成</param>     public async Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true)     {         if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())         {             return;         }          while (true)         {             if (await _migrateLock.TryAcquireLockAsync(connection))             {                 try                 {                     await MigrateAsync(dbContext);                     break;                 }                 finally                 {                     await _migrateLock.ReleaseLockAsync(connection);                 }             }              if (wait)             {                 _logger.LogWarning("Migration is locked, wait for 2 seconds");                 Thread.Sleep(20000);                  continue;             }              _logger.LogInformation("Migration is locked, skip");              break;         }     }      private void HandleError(DbContext dbContext, Exception e)     {         var needChangeList = dbContext.Database.GetPendingMigrations().ToList();         var allChangeList = dbContext.Database.GetMigrations().ToList();         var hasChangeList = dbContext.Database.GetAppliedMigrations().ToList();          if (needChangeList.Count + hasChangeList.Count > allChangeList.Count)         {             int errIndex = allChangeList.Count - needChangeList.Count;              if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])             {                 int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal);                 string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);                 if (hasChangeList[errIndex].EndsWith(errSuffix))                 {                     dbContext.Database.ExecuteSqlRaw($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");                     dbContext.Database.Migrate();                 }                 else                 {                     throw e;                 }             }             else             {                 throw e;             }         }         else         {             throw e;         }          _logger.LogInformation("Migration failed, but success on second try.");     }      private async Task HandleErrorAsync(DbContext dbContext, Exception e)     {         var needChangeList = (await dbContext.Database.GetPendingMigrationsAsync()).ToList();         var allChangeList = dbContext.Database.GetMigrations().ToList();         var hasChangeList = (await dbContext.Database.GetAppliedMigrationsAsync()).ToList();          if (needChangeList.Count + hasChangeList.Count > allChangeList.Count)         {             int errIndex = allChangeList.Count - needChangeList.Count;              if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0])             {                 int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal);                 string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index);                 if (hasChangeList[errIndex].EndsWith(errSuffix))                 {                     await dbContext.Database.ExecuteSqlRawAsync($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'");                     await dbContext.Database.MigrateAsync();                 }                 else                 {                     throw e;                 }             }             else             {                 throw e;             }         }         else         {             throw e;         }          _logger.LogInformation("Migration failed, but success on second try.");     } } 

发表评论

相关文章