场景
目前一个项目中数据持久化采用EF Core + MySQL,使用CodeFirst模式开发,并且对数据进行了分库,按照目前颗粒度分完之后,大概有一两百个库,每个库的数据都是相互隔离的。
借鉴了Github上一个开源的仓库 arch/UnitOfWork 实现UnitOfWork,核心操作就是每个api请求的时候带上库名,在执行CRUD之前先将DbContext切换到目标数据库,我们在切换数据库的时候加了一些操作,如检查数据库是否已创建、检查连接是否可用、判断是否需要 表结构迁移 等
/// <summary> /// 切换数据库 这要求数据库在同一台机器上 注意:这只适用于MySQL。 /// </summary> /// <param name="database">目标数据库</param> public void ChangeDatabase(string database) { // 检查连接 ...... // 检查数据库是否创建 ...... var connection = _context.Database.GetDbConnection(); if (connection.State.HasFlag(ConnectionState.Open)) { connection.ChangeDatabase(database); } else { var connectionString = Regex.Replace(connection.ConnectionString.Replace(" ", ""), @"(?<=[Dd]atabase=)w+(?=;)", database, RegexOptions.Singleline); connection.ConnectionString = connectionString; } // 判断是否需要执行表结构迁移 if(_context..Database.GetPendingMigrations().Any()) { //自定义的迁移的一些逻辑 _context.Database.Migrate(_context); } }
但是当多个操作同时对一个库进行Migrate的时候,就会出现问题,比如“新增一张表”的操作已经被第一个迁移执行过了,第二个执行的迁移并不知道已经执行过了Migrate,就会报错表已存在。
于是考虑在执行Migrate的时候,加入一个锁的机制,对当前数据库执行Migrate之前先获取锁,然后再来决定接下来的操作。由于这边有的服务无法访问Redis,这里使用数据库来实现锁的机制,当然用Redis来实现更好,加入锁的机制只是一种解决问题的思路。
利用数据库实现迁移锁
1. 新增 MigrationLocks
表来实现迁移锁
- 锁的操作不依赖DbContext实例
- 在执行Migrate之前,尝试获取一个锁,在获取锁之前,如果表不存在则创建
CREATE TABLE IF NOT EXISTS MigrationLocks ( LockName VARCHAR(255) PRIMARY KEY, LockedAt DATETIME NOT NULL );
- 成功往表中插入一条记录,视为获取锁成功,主键为需要迁移的库的名称
INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());
- 迁移完成后,删除这条记录,视为释放锁成功;
DELETE FROM MigrationLocks WHERE LockName = @database;
- 为防止 “死锁” 发生,每次尝试获取锁之前,会对锁的状态进行检查,释放超过5分钟的锁(正常来说,上一个迁移的执行时间不会超过5分钟)。
SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;
2. 封装一下MigrateLock的实现
/// <summary> /// 迁移锁 /// </summary> public interface IMigrateLock { /// <summary> /// 尝试获取锁 /// </summary> /// <param name="connection"></param> /// <returns></returns> bool TryAcquireLock(IDbConnection connection); /// <summary> /// 尝试获取锁 /// </summary> /// <param name="connection"></param> /// <returns></returns> Task<bool> TryAcquireLockAsync(IDbConnection connection); /// <summary> /// 释放锁 /// </summary> void ReleaseLock(IDbConnection connection); /// <summary> /// 释放锁 /// </summary> /// <returns></returns> Task ReleaseLockAsync(IDbConnection connection); } /// <summary> /// 迁移锁 /// </summary> public class MigrateLock : IMigrateLock { private readonly ILogger<MigrateLock> _logger; public MigrateLock(ILogger<MigrateLock> logger) { _logger = logger; } private const string CreateTableSql = @" CREATE TABLE IF NOT EXISTS MigrationLocks ( LockName VARCHAR(255) PRIMARY KEY, LockedAt DATETIME NOT NULL );"; private const string CheckLockedSql = "SELECT COUNT(*) FROM MigrationLocks WHERE LockName = @database AND LockedAt > NOW() - INTERVAL 5 MINUTE;"; private const string AcquireLockSql = "INSERT INTO MigrationLocks (LockName, LockedAt) VALUES (@database, NOW());"; private const string ReleaseLockSql = "DELETE FROM MigrationLocks WHERE LockName = @database;"; /// <summary> /// 尝试获取锁 /// </summary> /// <param name="connection"></param> /// <returns></returns> public bool TryAcquireLock(IDbConnection connection) { try { CheckLocked(connection); var result = connection.Execute(AcquireLockSql, new { database = connection.Database }); if (result == 1) { _logger.LogInformation("Lock acquired: {LockName}", connection.Database); return true; } _logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database); return false; } catch (Exception ex) { if (ex.Message.StartsWith("Duplicate")) { _logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database); } else { _logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database); } return false; } } /// <summary> /// 尝试获取锁 /// </summary> /// <param name="connection"></param> /// <returns></returns> public async Task<bool> TryAcquireLockAsync(IDbConnection connection) { try { await CheckLockedAsync(connection); var result = await connection.ExecuteAsync(AcquireLockSql, new { database = connection.Database }); if (result == 1) { _logger.LogInformation("Lock acquired: {LockName}", connection.Database); return true; } _logger.LogWarning("Failed to acquire lock: {LockName}", connection.Database); return false; } catch (Exception ex) { if (ex.Message.StartsWith("Duplicate")) { _logger.LogWarning("Failed acquiring lock due to duplicate entry: {LockName}", connection.Database); } else { _logger.LogError(ex, "Error acquiring lock: {LockName}", connection.Database); } return false; } } /// <summary> /// 释放锁 /// </summary> public void ReleaseLock(IDbConnection connection) { try { connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database }); _logger.LogInformation("Lock released: {LockName}", connection.Database); } catch (Exception ex) { _logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database); } } /// <summary> /// 释放锁 /// </summary> public async Task ReleaseLockAsync(IDbConnection connection) { try { await connection.ExecuteAsync(ReleaseLockSql, new { database = connection.Database }); _logger.LogInformation("Lock released: {LockName}", connection.Database); } catch (Exception ex) { _logger.LogError(ex, "Error releasing lock: {LockName}", connection.Database); } } /// <summary> /// 检查锁 /// </summary> private void CheckLocked(IDbConnection connection) { connection.Execute(CreateTableSql); var databaseParam = new { database = connection.Database }; var lockExists = connection.QueryFirstOrDefault<int>(CheckLockedSql, databaseParam); if (lockExists <= 0) { return; } _logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock."); connection.Execute(ReleaseLockSql, databaseParam); } /// <summary> /// 检查锁 /// </summary> private async Task CheckLockedAsync(IDbConnection connection) { await connection.ExecuteAsync(CreateTableSql); var databaseParam = new { database = connection.Database }; var lockExists = await connection.QueryFirstOrDefaultAsync<int>(CheckLockedSql, databaseParam); if (lockExists <= 0) { return; } _logger.LogWarning("Lock exists and is older than 5 minutes. Releasing old lock."); await connection.ExecuteAsync(ReleaseLockSql, databaseParam); } }
3. 封装一下MigrateExecutor的实现
/// <summary> /// 数据库迁移执行器 /// </summary> public interface IMigrateExcutor { /// <summary> /// 执行迁移 /// </summary> /// <param name="dbContext"></param> void Migrate(DbContext dbContext); /// <summary> /// 执行迁移 /// </summary> /// <param name="dbContext"></param> /// <returns></returns> Task MigrateAsync(DbContext dbContext); /// <summary> /// 并发场景执行迁移 /// </summary> /// <param name="dbContext"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> void ConcurrentMigrate(DbContext dbContext, bool wait = true); /// <summary> /// 并发场景执行迁移 /// </summary> /// <param name="dbContext"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> /// <returns></returns> Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true); /// <summary> /// 并发场景执行迁移 /// </summary> /// <param name="dbContext"></param> /// <param name="connection"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true); /// <summary> /// 并发场景执行迁移 /// </summary> /// <param name="dbContext"></param> /// <param name="connection"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true); } /// <summary> /// 数据库迁移执行器 /// </summary> public class MigrateExcutor : IMigrateExcutor { private readonly IMigrateLock _migrateLock; private readonly ILogger<MigrateExcutor> _logger; public MigrateExcutor( IMigrateLock migrateLock, ILogger<MigrateExcutor> logger) { _migrateLock = migrateLock; _logger = logger; } /// <summary> /// 执行迁移 /// </summary> /// <param name="dbContext"></param> /// <returns></returns> public void Migrate(DbContext dbContext) { try { if (dbContext.Database.GetPendingMigrations().Any()) { dbContext.Database.Migrate(); } } catch (Exception e) { _logger.LogError(e, "Migration failed"); HandleError(dbContext, e); } } /// <summary> /// 执行迁移 /// </summary> /// <param name="dbContext"></param> /// <returns></returns> public async Task MigrateAsync(DbContext dbContext) { try { if ((await dbContext.Database.GetPendingMigrationsAsync()).Any()) { await dbContext.Database.MigrateAsync(); } } catch (Exception e) { _logger.LogError(e, "Migration failed"); await HandleErrorAsync(dbContext, e); } } /// <summary> /// 并发场景执行迁移 /// </summary> /// <param name="dbContext"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> /// <returns></returns> public void ConcurrentMigrate(DbContext dbContext, bool wait = true) { if (!dbContext.Database.GetPendingMigrations().Any()) { return; } using var connection = MySqlConnectionHelper.CreateConnection(dbContext.Database.GetDbConnection().Database); ConcurrentMigrate(dbContext, connection, wait); } /// <summary> /// 并发场景执行迁移 /// </summary> /// <param name="dbContext"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> /// <returns></returns> public async Task ConcurrentMigrateAsync(DbContext dbContext, bool wait = true) { if ((await dbContext.Database.GetPendingMigrationsAsync()).Any()) { return; } await using var connection = await MySqlConnectionHelper.CreateConnectionAsync(dbContext.Database.GetDbConnection().Database); await ConcurrentMigrateAsync(dbContext, connection, wait); } /// <summary> /// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成) /// </summary> /// <param name="dbContext"></param> /// <param name="connection"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> public void ConcurrentMigrate(DbContext dbContext, IDbConnection connection, bool wait = true) { if (!dbContext.Database.GetPendingMigrations().Any()) { return; } while (true) { if (_migrateLock.TryAcquireLock(connection)) { try { Migrate(dbContext); break; } finally { _migrateLock.ReleaseLock(connection); } } if (wait) { _logger.LogWarning("Migration is locked, wait for 2 seconds"); Thread.Sleep(20000); continue; } _logger.LogInformation("Migration is locked, skip"); } } /// <summary> /// 并发场景执行迁移(供数据同步相关服务使用,”迁移锁“ 使用传入的 <see cref="IDbConnection"/> 对象来完成) /// </summary> /// <param name="dbContext"></param> /// <param name="connection"></param> /// <param name="wait">是否等待至正在进行中的迁移完成</param> public async Task ConcurrentMigrateAsync(DbContext dbContext, IDbConnection connection, bool wait = true) { if ((await dbContext.Database.GetPendingMigrationsAsync()).Any()) { return; } while (true) { if (await _migrateLock.TryAcquireLockAsync(connection)) { try { await MigrateAsync(dbContext); break; } finally { await _migrateLock.ReleaseLockAsync(connection); } } if (wait) { _logger.LogWarning("Migration is locked, wait for 2 seconds"); Thread.Sleep(20000); continue; } _logger.LogInformation("Migration is locked, skip"); break; } } private void HandleError(DbContext dbContext, Exception e) { var needChangeList = dbContext.Database.GetPendingMigrations().ToList(); var allChangeList = dbContext.Database.GetMigrations().ToList(); var hasChangeList = dbContext.Database.GetAppliedMigrations().ToList(); if (needChangeList.Count + hasChangeList.Count > allChangeList.Count) { int errIndex = allChangeList.Count - needChangeList.Count; if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0]) { int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal); string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index); if (hasChangeList[errIndex].EndsWith(errSuffix)) { dbContext.Database.ExecuteSqlRaw($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'"); dbContext.Database.Migrate(); } else { throw e; } } else { throw e; } } else { throw e; } _logger.LogInformation("Migration failed, but success on second try."); } private async Task HandleErrorAsync(DbContext dbContext, Exception e) { var needChangeList = (await dbContext.Database.GetPendingMigrationsAsync()).ToList(); var allChangeList = dbContext.Database.GetMigrations().ToList(); var hasChangeList = (await dbContext.Database.GetAppliedMigrationsAsync()).ToList(); if (needChangeList.Count + hasChangeList.Count > allChangeList.Count) { int errIndex = allChangeList.Count - needChangeList.Count; if (hasChangeList.Count - 1 == errIndex && hasChangeList[errIndex] != needChangeList[0]) { int index = needChangeList[0].IndexOf("_", StringComparison.Ordinal); string errSuffix = needChangeList[0].Substring(index, needChangeList[0].Length - index); if (hasChangeList[errIndex].EndsWith(errSuffix)) { await dbContext.Database.ExecuteSqlRawAsync($"Update __EFMigrationsHistory set MigrationId = '{needChangeList[0]}' where MigrationId = '{hasChangeList[errIndex]}'"); await dbContext.Database.MigrateAsync(); } else { throw e; } } else { throw e; } } else { throw e; } _logger.LogInformation("Migration failed, but success on second try."); } }