分库分表—3.详细介绍二
技术分享
3周前 (12-08)
0
999+
大纲
18.基于Canal和RocketMQ的增量同步
19.增量同步任务的背景介绍
20.增量同步任务查询与线程池提交
21.RocketMQ里的binlog消息的消费逻辑分析
22.新增binlog的数据同步逻辑分析
23.binlog基于内存队列的异步转发逻辑
24.基于CAS加锁的读写队列互换机制
25.binlog基于内存的merge合并逻辑
26.对merge数据从目标库里分批查询
27.对merge数据基于目标库数据做过滤
28.将过滤后的merge数据写入目标库
29.offset提交线程的启动和逻辑分析
30.增量同步过程中binlog写入失败的恢复
31.增量同步过程中的各种失败场景的恢复机制
32.定时移除已提交的增量同步消息
33.增量与全量并行运行的场景分析
34.增量与全量并发同步一批数据的冲突
35.同步完成后的数据校验逻辑分析
36.数据迁移完成后的无损发布方案
37.分库分表线上运维与扩容方案
整个分库分表的流程:
全量同步 + 增量同步 -> 让多库多表数据和单库单表数据持平 -> 数据校验 -> 无损发布 -> 老系统下线 -> 线上多库多表DDL运维 -> 多库多表再次扩容
18.基于Canal和RocketMQ的增量同步
(1)在全量数据同步的过程中存在的问题
(2)增量同步方案概述
(1)在全量数据同步的过程中存在的问题
刚从源数据库查出一批数据写入到目标库,已经开始去同步下一批数据了,但上一批数据可能又在源数据库里进行了修改或删除操作。为了处理这种问题,就需要引入增量同步方案。
(2)增量同步方案概述
在发起全量数据迁移任务之前,需要基于Canal去监听源数据库的增删改binlog。Canal监听到binlog后需要将binlog消息发送到RocketMQ消息中间件集群中,接着数据迁移系统上会有一个RocketMQ消费者去消费这些binlog消息,这样数据迁移系统就会把源数据库的增删改操作往目标库中进行同步。
19.增量同步方案的整体介绍
(1)全量同步是一个滚动查询 + 数据插入的过程
(2)增量同步是为了解决全量同步中已同步数据出现的数据变动问题
(3)增量同步的数据高效写入和数据防止丢失方案
(1)全量同步是一个滚动查询 + 数据插入的过程
其中会涉及:一批批查 + 设置滚动ID + 数据过滤 + 去重校验 + 记录迁移明细 + 中断恢复 + 进度统计。
(2)增量同步是为了解决全量同步中已同步数据出现的数据变动问题
开启全量同步之前,都会先开启增量同步。
增量同步需要分析以下三种情况:
情况一:还没同步的一批数据发生增删改
情况二:正在同步的一批数据发生增删改
情况三:已经同步的一批数据发生增删改
(3)增量同步的数据高效写入和数据防止丢失方案
在增量同步中,首先会通过Canal监听源数据库中的binlog⽇志,然后Canal再将监听到的binlog⽇志发送放到RocketMQ中,接着数据迁移系统会消费RocketMQ中的binlog消息,把增删改操作同步到目标数据库。
问题一:数据迁移系统消费MQ消息时,如何保证从MQ获取到的binlog消息不会丢失
如果源数据库增删改操作了,但由于消费异常导致binlog消息丢失了,那么目标数据库中就没有对应的增量数据操作,这样源数据库和目标数据库的数据就会不⼀致。为了避免消费异常导致binlog消息丢失,需要设置禁止自动提交消息。
消费MQ的binlog消息时,为了提升消费速度,可以采用多线程进行消费。比如每消费一条MQ消息,就向线程池提交一个任务,任务执行完才提交消息。当这些任务的执行速度慢于消费MQ消息的速度时,线程池的阻塞队列中就会积压一些任务。如果此时机器发布重启,那么就可能会导致线程池中阻塞队列里积压的任务丢失。但是由于禁止消息自动提交,所以这些丢失任务对应的MQ消息后续还可以重新被消费,然后再次被提交到线程池中进行处理。
为了方便对binlog消息进行管理和确保binlog消息不丢失且有记录可查,这里引⼊消息拉取落库和异步消息提交机制,由两个定时任务来完成。如下所示:
⾸先源数据库中会有⼀张消费记录表,定时任务1每次从MQ拉取并消费⼀条消息时,都会先在消费记录表中新增⼀条消费记录,每条消费记录的初始状态都为未消费。然后定时任务1再将获取到的binlog消息,在目标数据库中重做对应的binlog⽇志。也就是将旧库中的增删改操作,在目标数据库中重做⼀遍。重做完成后,再来更新刚刚添加的消费记录的状态,从未消费更新为已消费状态。
此时需要注意:定时任务1消费MQ的binlog消息后,并不是自动向MQ提交消息,⽽是需要进行⼿动提交。否则如果消息都没有消费成功,就自动向MQ提交消息,则可能会出现消息丢失的情况。所以为了保证binlog消息不丢失,不会⾃动提交消息,⽽是将提交消息的任务交给定时任务2来处理。
定时任务2会专⻔从消费记录表中,查询已消费的那些记录,然后向MQ提交消息,这样下次就不会从MQ中消费到了。向MQ提交完消息后,同时会将消费记录表中的记录状态,从已消费更新为已提交。⾄此,⼀个消息的消费流程才算结束。
问题二:如何提高增量同步时的数据写入效率
为了提高数据写入目标数据库的效率,这里引入了数据合并、过滤、读写队列的机制,读写队列和数据合并流程图如下:
定时任务1添加完消费记录后,并不会⻢上把数据写入目标库,⽽是把binlog日志先放到⼀个写队列中,与写队列相对的还有⼀个读队列。读队列是专⻔用于提供给定时任务3进行处理消息写⼊操作的。
数据合并提升写入效率:如果源数据库中的数据在短时间内进⾏了多次操作,其实只需要保留最新的binlog⽇志即可。所以才使用了一个内存队列来存放binlog消息,而且会每隔15秒批量处理一次内存队列的所有binlog消息,以此减少同一条数据对应多条binlog的写入处理。
binlog日志的处理细节:从合并后的binlog⽇志中获取主键ID,根据主键ID到目标库中查询对应的数据。
如果目标库中能查到这条数据,那么需要和源数据库的binlog数据进⾏对⽐。只有当源数据库的更新时间⼤于目标库的更新时间,才允许更新数据到目标库中。如果当前的binlog⽇志的操作类型为删除操作,则可不⽤对⽐更新时间,直接在目标库中重做这条binlog⽇志,毕竟源数据库在删除⼀条数据时不会更新修改时间。
如果源数据库的⼀条binlog⽇志对应的数据在目标库中没有查到,那么继续判断。如果binlog⽇志是删除操作,那就没必要在目标库中重做这条⽇志了,直接过滤掉。目标库都没有数据了,就没必要执⾏删除操作。如果binlog⽇志的类型为修改操作,那也没必要执⾏修改操作。因为目标库没数据,直接update也不⾏,可以将binlog的操作类型修改为新增操作。毕竟在binlog⽇志中,包含了⼀条订单数据的所有字段的值,⾜以满⾜新增数据需要的所有字段。
经过以上的数据过滤操作,⼀⽅⾯避免源数据库中的旧数据覆盖了目标库的新数据,另⼀⽅⾯避免了没必要执⾏的删除和更新操作也在目标库中继续执⾏。
20.增量同步任务查询与线程池提交
CanalConsumeTask继承自ApplicationRunner,也就是系统启动的时候,它就会跑起来。
CanalConsumeTask首先会查出当前配置好的需要滚动查询全量数据的迁移任务。每个滚动查询全量数据的迁移任务就对应一个增量同步任务。然后创建一个线程池,线程数量 = 已配置好的滚动查询全量数据的迁移任务的数量。接着遍历每一个滚动查询数据的迁移业务,提交两个任务到线程池中。其中的CanalPullRunner拉取任务,会设置禁止Consumer自动提交offset,只拉取不提交。另外的CanalPullCommitRunner提交任务,此时才会提交offset。
需要注意的是:从RocketMQ里消费binlog消息时,需要避免Consumer自动去提交offset。需要精准控制offset提交,当每一条binlog都已经被应用到目标数据里后,才能对这条offset进行提交。因为RocketMQ的Consumer提交offset默认是自动提交:即会先提交到本地缓存,再提交到RocketMQ。而这就可能会导致offset提交时数据还没被应用到目标数据库。
@Component public class CanalConsumeTask implements ApplicationRunner { //RocketMQ的nameServer地址 @Value("${rocketmq.name-server:127.0.0.1:9876}") private String nameServerUrl; //可以从migrateConfigService拿到增量同步配置 //要从RocketMQ里监听到binlog变更,需要知道要关注的是哪些库哪些表的binlog变更 @Autowired private MigrateConfigService migrateConfigService; //ApplicationRunner在系统启动时就会运行run()方法 @Override public void run(ApplicationArguments args) throws Exception { //首先查出当前配置好的需要滚动查询全量数据的迁移任务,每个滚动查询全量数据的迁移任务就对应一个增量同步任务 List<ScrollDomain> scrollDomainList = migrateConfigService.queryScrollDomainList(); //这里会创建一个线程池,线程数量 = 已配置好的滚动查询全量数据的迁移任务的数量 ExecutorService executors = Executors.newFixedThreadPool(scrollDomainList.size()); for (ScrollDomain scrollDomain : scrollDomainList) { //接下来会提交两个任务 if (scrollDomain.getDataSourceType().equals(1)) { //从RocketMQ里消费binlog消息时,需要避免Consumer自动去提交offset //需要精准控制offset提交,当每一条binlog都已经被应用到目标数据里后,才能对这条offset进行提交 //因为RocketMQ的Consumer提交offset默认都是自动提交:也就是会先提交到本地缓存,再提交到RocketMQ //而这就可能会导致offset提交时数据还没被应用到目标数据库 //执行拉取任务,此时设置Consumer不自动提交offset,只拉取不提交 executors.execute(new CanalPullRunner(scrollDomain.getDomainTopic(), nameServerUrl)); //执行提交任务,此时才会提交offset executors.execute(new CanalPullCommitRunner(scrollDomain.getDomainTopic(), nameServerUrl)); } } } } //需要滚动查询全量数据的迁移任务配置表 @Data public class ScrollDomain implements Serializable { //主键ID private Long id; //所属系统(会员、订单、交易) //需要进行增量同步的表,是来源于哪个业务的,这个业务可以是会员、订单、交易等 private String domain; //当数据源为来源的时候,配置对应的消息topic //需要进行同步的表,该表的binlog会被写入到RocketMQ的哪个topic里去 private String domainTopic; //数据源类型,1需要读取数据,2需要写入数据 private Integer dataSourceType; //是否显示 ShardingSphere SQL执行日志 private Integer sqlshow; //每个逻辑库中表的数量 private int tableNum; }
21.RocketMQ里的binlog消息的消费逻辑分析
RocketMQ里的binlog消息会由线程池里的CanalPullRunner任务来处理。往线程池提交CanalPullRunner任务时,会传入RocketMQ的地址和主题。然后CanalPullRunner任务首先会设置RocketMQ Consumer禁止自动提交offset,让当前Consumer不要自动去提交offset,防止还没处理完binlog消息就提交offset。如果offset已经被自动提交,但是binlog消息却处理失败,那么RocketMQ就不会让消费者再次消费了。之后CanalPullRunner会设置Consumer订阅指定的topic和nameServer地址,然后启动这个Consumer。
接着会进入while死循环中,并不断通过Consumer从RocketMQ中一批批的Pull消息出来消费处理。对于Consumer从RocketMQ中Pull到的每一条消息,会获取对应的topic、queue、offset、body。然后判断该消息是否已被处理过,即是否已经存在于消费记录表中,如果存在则跳过执行。如果还没处理过,那么调用processNewMsg()方法进行处理,并往消费记录表中插入一条记录。
//binlog消息拉取任务(只拉不提交) public class CanalPullRunner implements Runnable { //消息主题 private final String topic; //RocketMQ的NameServer地址 private final String nameServerUrl; //binlog消息同步消费记录表Mapper private final EtlBinlogConsumeRecordMapper consumeRecordMapper; //消息拉取任务构造方法 //@param topic 消息主题 //@param nameServerUrl RocketMQ的NameServer地址 public CanalPullRunner(String topic, String nameServerUrl) { this.topic = topic; this.nameServerUrl = nameServerUrl; this.consumeRecordMapper = ApplicationContextUtil.getBean(EtlBinlogConsumeRecordMapper.class); } @Override public void run() { pullRun(); } //执行消息拉取 private void pullRun() { try { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("binlogPullConsumer"); //设置RocketMQ Consumer禁止自动提交offset,让它不要自动去提交offset,防止还没完处理完binlog消息就提交offset //如果offset已经被自动提交,但是binlog消息却处理失败,那么RocketMQ就不会让消费者再次消费了 litePullConsumer.setAutoCommit(false); litePullConsumer.setNamesrvAddr(nameServerUrl); litePullConsumer.subscribe(topic, "*"); litePullConsumer.start(); try { //进入while死循环中,通过Consumer从RocketMQ中一批批的Pull消息出来消费处理 while (true) { //拉取未消费消息 List<MessageExt> messageExts = litePullConsumer.poll(); if (CollUtil.isNotEmpty(messageExts)) { for (MessageExt messageExt : messageExts) { byte[] body = messageExt.getBody(); String msg = new String(body); //记录queueId和offset int queueId = messageExt.getQueueId(); long offset = messageExt.getQueueOffset(); String topic = messageExt.getTopic(); //topic、queue、offset、msg,四位一体,把所有的信息都拿到 //判断该消息是否已被处理过,即是否已经存在于消费记录表中,如果存在则跳过执行 EtlBinlogConsumeRecord existsRecord = consumeRecordMapper.getExistsRecord(queueId, offset, topic); if (null == existsRecord) { //如果还没处理过,那么进行处理,并往消费记录表中插入一条记录 processNewMsg(messageExt, msg); } else { //处理已经存在的消费记录 proccessExistsRecord(litePullConsumer, msg, existsRecord); } } } else { Thread.sleep(5000); } } } finally { litePullConsumer.shutdown(); } } catch (InterruptedException | MQClientException e) { try { //假设要拉取消息的主题还不存在,则会抛出异常,这种情况下休眠五秒再重试 Thread.sleep(5000); pullRun(); } catch (InterruptedException ignored) { } } } ... }
22.新增binlog的数据同步逻辑分析
对于新增的binlog数据,会调用CanalPullRunner的processNewMsg()方法进行处理。
由于首先拿到的msg是一个字符串格式的binlog,所以需要对该字符串格式的binlog做一个解析,然后把这个解析后的binlog的信息封装到自定义的BinlogData对象里去。
由于binlog的字符串格式是json格式,所以可以把binlog的字符串转成json对象。然后从json对象里提取一个一个字段出来,构建BinlogData对象。
拿到一个BinlogData对象之后,接着会封装binlog消费处理记录对象EtlBinlogConsumeRecord。此时不会把binlog的数据放到EtlBinlogConsumeRecord里,但会设置topic、queue、offset等信息。
之所以关闭自动提交offset的功能,是因为这里的每一条binlog消息都将被异步处理。也就是每一条binlog消息,都会提交到本地队列来实现异步化处理。所以如果异步化都没处理完毕,就自动提交了offset告诉MQ已处理成功,那是有问题的。
因此这里Consumer消费消息时,会把消息先放入本地队列。然后把binlog在RocketMQ里的topic、queue、offset封装成Record对象,插入到DB。
当Record对象插入DB后,binlog自己的数据才会提交到本地队列进行异步化处理。此时offset还不会提交给Broker,必须等到异步化处理完该binlog消息后,才能提交offset。
public class CanalPullRunner implements Runnable { ... //处理新的消息 //@param messageExt mq消息对象 //@param msg 消息内容 private void processNewMsg(MessageExt messageExt, String msg) { try { //由于首先拿到的msg是一个字符串格式的binlog,所以需要对该字符串格式的binlog做一个解析 //然后把这个解析后的binlog的信息封装到自定义的BinlogData对象里去 BinlogData binlogData = BinlogUtils.getBinlogDataMap(msg); Boolean targetOperateType = BinlogType.INSERT.getValue().equals(binlogData.getOperateType()) || BinlogType.DELETE.getValue().equals(binlogData.getOperateType()) || BinlogType.UPDATE.getValue().equals(binlogData.getOperateType()); if (!targetOperateType || null == binlogData || null == binlogData.getDataMap()) { return; } //拿到一个BinlogData对象之后,接着会封装一个binlog消费处理记录EtlBinlogConsumeRecord //此时不会把binlog自己的数据放到EtlBinlogConsumeRecord里,但会设置topic、queue、offset等信息 //之所以关闭RocketMQ Consumer自动提交offset的功能,是因为这里的每一条binlog消息都将被异步处理 //也就是每一条binlog消息,都会提交到本地队列里,依托本地队列来实现异步化处理 //所以对于Consumer来说,如果异步化都没处理完毕,就自动提交了offset,告诉MQ已经处理成功了,那是有问题的 //因此这里Consumer消费消息时,是先放入队列处理 //然后把binlog在RocketMQ里的topic、queue、offset封装成EtlBinlogConsumeRecord对象,插入到DB里 EtlBinlogConsumeRecord consumeRecord = new EtlBinlogConsumeRecord(); consumeRecord.setQueueId(messageExt.getQueueId()); consumeRecord.setOffset(messageExt.getQueueOffset()); consumeRecord.setTopic(messageExt.getTopic()); consumeRecord.setBrokerName(messageExt.getBrokerName()); consumeRecord.setConsumeStatus(ConsumerStatus.NOT_CONSUME.getValue()); consumeRecord.setCreateTime(new Date()); consumeRecordMapper.insert(consumeRecord); //EtlBinlogConsumeRecord对象插入DB后,binlog自己的数据会提交到本地队列进行异步化处理 //此时offset还不会提交给Broker的,必须等到异步化处理完该binlog消息后,才能提交offset给Broker LocalQueue.getInstance().submit(binlogData, consumeRecord); } catch (Exception e) { log.error("新增消费记录失败", e); } } ... }
解析binlog的json字符串为对象的逻辑如下:
//MySQL binlog解析工具类 public abstract class BinlogUtils { ... //解析binlog json字符串 //@param binlogStr binlog json字符串 //@param dataType 解析后的data的类型(实体类还是map) //@return BinlogData //@throws ClassNotFoundException 找不到实体类异常 //@throws InstantiationException 实例化实体类异常 //@throws IllegalAccessException 非法访问异常 private static BinlogData getBinlogData(String binlogStr, String dataType) throws ClassNotFoundException, InstantiationException, IllegalAccessException { //isJson方法里面会判断字符串是不是为空,所以这里不需要重复判断 //由于binlog的字符串格式是json格式,所以可以把binlog的字符串转成json对象,然后从json对象里提取一个一个字段出来 if (JSONUtil.isJson(binlogStr)) { JSONObject binlogJson = JSONUtil.parseObj(binlogStr); BinlogData binlogData = new BinlogData(); //表名 String tableName = binlogJson.getStr("table"); binlogData.setTableName(tableName); //操作类型 String operateType = binlogJson.getStr("type"); binlogData.setOperateType(operateType); //操作时间 Long operateTime = binlogJson.getLong("ts"); binlogData.setOperateTime(operateTime); //获取数据json数组 JSONArray dataArray = binlogJson.getJSONArray("data"); if (null != dataArray) { Iterable<JSONObject> dataArrayIterator = dataArray.jsonIter(); //遍历data节点并反射生成对象 if (null != dataArrayIterator) { //binlog的data数组里数据的类型为实体类 if (DATATYPE_DOMAIN.equals(dataType)) { //获取实体类名称 String domainName = DOMAIN_PATH + '.' + StrUtil.upperFirst(StrUtil.toCamelCase(tableName)); //获取表对应的实体类(这里出现异常就抛出去了,实际使用时应该捕获并记录日志,因为根据表名找不到对象,那么这个表的所有数据都无法同步,这种情况肯定要记录日志并告警的) Class<?> domainClass = Class.forName(domainName); List<Object> datas = new ArrayList<>(); while (dataArrayIterator.iterator().hasNext()) { JSONObject jsonObject = dataArrayIterator.iterator().next(); Field[] fields = domainClass.getDeclaredFields(); //通过反射创建实体类实例,这里的异常也直接外抛,实际处理时需要记录这个异常并告警 Object domain = domainClass.newInstance(); for (Field field : fields) { //根据属性名称反向取得对应的表中的字段名称,然后根据属性的类型取得字段值并通过set方法设置进去 String fieldName = field.getName(); String columnName = StrUtil.toSymbolCase(fieldName, '_'); //因为我们的属性是私有的,所以这里需要设置为可访问方便直接设值 field.setAccessible(true); Object fieldValue = getFieldValue(field.getType(), columnName, jsonObject); if (null != fieldValue) { field.set(domain, fieldValue); } } datas.add(domain); } binlogData.setDatas(datas); } else if (DATATYPE_MAP.equals(dataType)) { //binlog的data数组里数据的类型为Map List<Map<String, Object>> dataMap = new ArrayList<>(); while (dataArrayIterator.iterator().hasNext()) { JSONObject jsonObject = dataArrayIterator.iterator().next(); Map<String, Object> data = new HashMap<>(); jsonObject.keySet().forEach(key -> { data.put(key, jsonObject.get(key)); }); dataMap.add(data); } binlogData.setDataMap(dataMap); } } } return binlogData; } return null; } ... }
23.binlog基于内存队列的异步转发逻辑
CanalPullRunner在processNewMsg()方法处理一条新增的binlog时:首先会封装一条binlog消费处理记录写入到数据库中,然后再将该binlog消息提交到自定义的内存队列LocalQueue。
LocalQueue.getInstance().submit(binlogData, consumeRecord)
在往内存队列写入binlog消息时,会首先加一个轻量级的Atomic——PutBinlogLock。因为LocalQueue内存写队列writeQueue是LinkedList,LinkedList并不是线程安全的。成功加锁的线程才能把binlog消息写入到内存队列LocalQueue的内存写队列writeQueue里,然后释放锁。
这个轻量级Atomic锁会进行如下设计:假设多个线程同时来进行加锁,多个线程都会去对Atomic变量进行CAS操作。但只有一个线程可以把Atomic变量从true变为false,Atomic变量默认就是支持线程安全的。也就是只有一个线程可以完成加锁的逻辑(即在执行compareAndSet()前flag为true,执行后flag为false。而其他线程加锁都会失败并在进入自旋(因为执行compareAndSet()前flag=flag,执行后flag还是false。
//binlog消息拉取任务(只拉不提交) public class CanalPullRunner implements Runnable { ... private void processNewMsg(MessageExt messageExt, String msg) { ... consumeRecordMapper.insert(consumeRecord); LocalQueue.getInstance().submit(binlogData, consumeRecord); ... } ... } //数据缓存阻塞队列类 public class LocalQueue { private static volatile LocalQueue localQueue; //提供锁的实例对象 private final PutBinlogLock lock = new PutBinlogLock(); //数据同步的写队列 private volatile LinkedList<BinlogData> writeQueue = new LinkedList<>(); //数据同步的读队列 private volatile LinkedList<BinlogData> readQueue = new LinkedList<>(); //由于可能会多线程进行并发读和写,所以一般定义为volatile类型,来保证线程之间的可见性 //isRead始终只有一些相隔15秒的线程在写 private volatile boolean isRead = false; private LocalQueue() { } //构建一个单例模式对象 public static LocalQueue getInstance() { if (null == localQueue) { synchronized (LocalQueue.class) { if (null == localQueue) { localQueue = new LocalQueue(); } } } return localQueue; } //数据写入队列 //@param binlogData MySQL的binlog对象 //@param consumeRecord 消费记录 public void submit(BinlogData binlogData, EtlBinlogConsumeRecord consumeRecord) { //writeQueue是LinkedList,LinkedList并不是线程安全的 lock.lock(); try { binlogData.setConsumeRecord(consumeRecord); writeQueue.add(binlogData); } finally { lock.unlock(); } } ... } //锁竞争类对象 //基于Atomic的轻量级锁 public class PutBinlogLock { private final AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); //加锁 public void lock() { //假设多个线程同时来进行加锁 boolean flag; do { //多个线程都会去对Atomic变量进行CAS操作,只有一个线程可以把Atomic变量从true变为false,Atomic变量默认就是支持线程安全的 //也就是只有一个线程可以完成加锁的逻辑(此时执行下面compareAndSet前flag=true,执行后flag=false) //其他线程CAS加锁都会失败并在这里进入自旋(此时执行下面compareAndSet前flag=flag,执行后flag还是false,从而自旋) flag = this.putMessageSpinLock.compareAndSet(true, false); } while (!flag); } //释放锁 public void unlock() { //只有一个线程可以成功的执行cas,把false变为true this.putMessageSpinLock.compareAndSet(false, true); } }
24.基于CAS加锁的读写队列互换机制
对binlog的处理是基于定时批处理的,不是来一条binlog就处理一条binlog。默认会每隔15秒处理内存队列里writeQueue的binlog数据,从而实现统一的批处理。
所以会有一个定时调度任务IncrementTask,负责定时对增量binlog数据进行处理。该定时调度任务IncrementTask会每隔15秒跑一次处理内存队列writeQueue的binlog数据。也就是定时调度任务IncrementTask会每隔15秒执行一次LocalQueue的doCommit()方法。
当LocalQueue的doCommit()方法对内存队列writeQueue的binlog数据进行处理时,为了提升性能,避免处理内存队列writeQueue的binlog数据时,对writeQueue持有锁耗时过长。所以设计了一个交换队列readQueue,会快速将writeQueue的数据复制到readQueue中。只有两个队列在交换数据时才会加锁,避免对writeQueue的长时间操作。
首先会设置一个标记isRead表明是否正在处理binlog,isRead始终只有一些相隔15秒的线程在写。只有isRead为false时,才能对内存队列writeQueue的binlog数据进行批处理。开始进行批处理时,会设置isRead为true。队列交换前,readQueue是空的,writeQueue是有数据的。交换进行中,writeQueue会被加锁,等待交换完成才释放锁,才能往writeQueue里写。当然往writeQueue里写binlog数据时,也会加锁的,写完之后才释放锁。交换完成后,writeQueue是空的,readQueue是有数据的。当后续慢慢处理完readQueue里的binlog数据了,设置isRead为false。
需要注意:加锁都是基于CAS轻量级的加锁,为什么不去用JDK提供的线程并发安全的队列呢?因为JDK提供的线程并发安全的队列,仅仅是队列自己内部是线程并发安全而已。而这里我们需要确保多个队列queue,在同时操作时,也都是线程并发安全的。所以这里才需要我们自己去对队列writeQueue的操作进行加锁。
//负责定时对增量数据写入落地 //对binlog的处理都是基于定时批处理的,不是来一条binlog就处理一条binlog //默认是收集15s内的数据统一做一个批处理 @Component public class IncrementTask { //负责增量数据的写入动作,每隔15秒跑一次 @Scheduled(fixedDelay = 15000) void IncrementTask() { //获取内存队列单例 LocalQueue localQueue = LocalQueue.getInstance(); //判断读队列的数据是否已被处理完毕 //刚开始localQueue的isRead默认就是false;如果上一次数据导入操作还在做(isRead会为true),则不做处理 if (!localQueue.getIsRead()) { log.info("增量数据执行写入"); //处理读队列里的数据,执行数据写入 localQueue.doCommit(); } } } //数据缓存阻塞队列类 public class LocalQueue { private static volatile LocalQueue localQueue; //提供锁的实例对象 private final PutBinlogLock lock = new PutBinlogLock(); //数据同步的写队列 private volatile LinkedList<BinlogData> writeQueue = new LinkedList<>(); //数据同步的读队列 private volatile LinkedList<BinlogData> readQueue = new LinkedList<>(); //由于可能会多线程进行并发读和写,所以一般定义为volatile类型,来保证线程之间的可见性 //isRead始终只有一些相隔15秒的线程在写 private volatile boolean isRead = false; ... //获取是否正在读取数据解析落地 public Boolean getIsRead() { return this.isRead; } //将读队列缓存的数据,进行数据合并处理,并写入存储落地 public void doCommit() { //标记目前正在读取读队列的binlog数据,进行写入存储落地 isRead = true; //读取读队列里的binlog数据,并写入完成后,交互一下读写队列 swapRequests(); if (!readQueue.isEmpty()) { ... //readQueue里的binlog数据处理 } readQueue.clear(); isRead = false; } //交换队列 private void swapRequests() { //writeQueue是LinkedList,LinkedList并不是线程安全的 lock.lock(); //注意:加锁都是基于CAS轻量级的加锁,那为什么不去用JDK提供的默认线程并发安全的队列呢? //因为JDK提供的默认线程并发安全的队列,仅仅是队列自己内部是线程并发安全而已 //而这里我们需要确保多个队列queue,在同时操作时,也都是线程并发安全的 //所以这里才需要我们自己去对队列操作进行加锁 //加锁有三种方式:synchronized、ReentrantLock和CAS //这里选择CAS,是因为它是轻量级的加锁 //况且JDK自己内部实现加锁,都是基于CAS加锁,如果加锁不成功会进入while(true)自旋 //所以从性能来说,CAS会更好一些 try { log.info("本次同步数据写入:" + writeQueue.size() + "条数"); LinkedList<BinlogData> tmp = writeQueue; writeQueue = readQueue; readQueue = tmp; } finally { lock.unlock(); } } ... }
25.binlog基于内存的merge合并逻辑
如果对同一条数据,有增删改多个binlog,比如有insert、update、update、delete4条binlog,那么这4条binlog其实都没有必要都放到目标库里跑一遍。完全可以对它们合并在一起,直接执行最后一条delete操作即可。
所以LocalQueue的doCommit()方法在处理readQueue里的binlog数据时,首先会通过MergeBinlogWrite组件来进行合并操作。通过MergeBinlogWrite组件完成binlog数据合并后,会过滤无效的binlog数据。过滤完无效的binlog数据后,再通过MergeBinlogWrite组件完成向目标库写入binlog。
MergeBinlogWrite是一个支持合并binlog的目标库写入组件,MergeBinlogWrite的mergeBinlog()方法会对数据进行合并处理。
注意:从readQueue拿到的一条数据,可能会包含一条数据的多个binlog。所以需要先对可能的多个binlog进行merge合并,也就是把每条数据的binlog放入一个map里。
//数据缓存阻塞队列类 public class LocalQueue { ... //将读队列缓存的数据,进行数据合并处理,并写入存储落地 public void doCommit() { //标记目前正在读取读队列的binlog数据,进行写入存储落地 isRead = true; //读取读队列里的binlog数据,并写入完成后,交互一下读写队列 swapRequests(); if (!readQueue.isEmpty()) { //如果对同一条数据,有增删改多个binlog,比如有insert、update、update、delete4条binlog //那么这4条binlog其实都没有必要都放到目标库里跑一遍 //完全可以对它们合并在一起,直接执行最后一条delete操作即可 //所以这里会在内存里进行binlog的merge操作 //MergeBinlogWrite是一个支持合并binlog的目标库写入组件 MergeBinlogWrite mergeBinlogWrite = new MergeBinlogWrite(); //遍历存储在读队列readQueue的binlog数据,然后进行数据合并,保留时间最新的操作 for (BinlogData binlogData : readQueue) { //对数据进行合并处理 mergeBinlogWrite.mergeBinlog(binlogData); } //接着对数据进行校验,过滤无效的数据,例如已经小于目标库记录时间的 //过滤掉那些:merge以后的binlog,它的操作时间比目标库里的数据时间旧 mergeBinlogWrite.filterBinlogAging(OperateType.ADD, null); //数据写入,按表分组写入 mergeBinlogWrite.write(OperateType.ADD, null); } readQueue.clear(); isRead = false; } ... } //对数据合并、过滤、写入存储 //在进行全量数据同步时,会调用MergeBinlogWrite组件的load()方法对数据进行过滤 //在进行增量数据同步时,会通过MergeBinlogWrite组件的mergeBinlog()方法对监听到的binlog进行合并操作 public class MergeBinlogWrite { //用于存储过滤最新的数据,binlogDataMap的key是由"每条数据的主键ID" + "&" + "表名"组成的 private final Map<String, BinLog> binlogDataMap = new HashMap<>(2048); //存储本次需要更新的消息对象信息 private List<EtlBinlogConsumeRecord> etlBinlogConsumeRecordList = new ArrayList<>(); ... //对同步的数据进行合并,转换 //@param binlogData MySQL的binlog对象 public void mergeBinlog(BinlogData binlogData) { //此时从readQueue拿到的一条数据,可能会包含一条数据的多个binlog //所以这里会先对可能的多个binlog进行merge合并,也就是把每条数据的binlog先放在一个map里 List<Map<String, Object>> dataList = binlogData.getDataMap(); if (CollectionUtils.isEmpty(dataList)) { return; } //获取binlog对应的表名 String key = MergeConfig.getSingleKey(binlogData.getTableName()); for (Map<String, Object> dataMap : dataList) { //每次都需要先加入到集合中,用于处理完后,批量更新 etlBinlogConsumeRecordList.add(binlogData.getConsumeRecord()); //先获取这条同步记录的唯一标识字段 //RocketMQ里有一个topic -> topic里有一个table -> table里有一个标识字段值(订单编号) String mergeKey = dataMap.get(key) + SPLIT_KEY + binlogData.getTableName() + SPLIT_KEY + binlogData.getConsumeRecord().getTopic(); //验证是否在这批同步的数据当中,有相同的更新记录 BinLog getBinlogData = binlogDataMap.get(mergeKey); if (!ObjectUtils.isEmpty(getBinlogData)) { //判断历史的记录的操作时间,是否大于本次同步的操作时间 //上一次放到map里的binlog是比较新的,此时这条binlog是旧的,不要去做任何处理 //例如:insert update1 update2,那么如果update2先进来map,update1后进来,就不要去管它了 if (getBinlogData.getOperateTime().compareTo(binlogData.getOperateTime()) > 0) { continue; } } //将数据转换为单条log对象 BinLog binLog = buildBinLog(binlogData, dataMap); //在这里的merge,其实就是对一条数据的多个binlog,按照时间先后顺序,去做一个覆盖 binlogDataMap.put(mergeKey, binLog); //topic->table->数据标识,binlog,map } } //转换成单条存储的sql变更对象 //@param binlogData MySQL的binlog对象 //@param dataMap 单条sql的信息 //@return binlog对象 private BinLog buildBinLog(BinlogData binlogData, Map<String, Object> dataMap) { BinLog binLog = new BinLog(); binLog.setDataMap(dataMap); binLog.setOperateTime(binlogData.getOperateTime()); binLog.setOperateType(binlogData.getOperateType()); binLog.setTableName(binlogData.getTableName()); binLog.setTopic(binlogData.getConsumeRecord().getTopic()); return binLog; } ... }
26.对merge数据从目标库里分批查询
在LocalQueue的doCommit()方法中,调用MergeBinlogWrite.mergeBinlog()方法完成binlog数据的合并后,接着就会调用MergeBinlogWrite的filterBinlogAging()方法过滤无效的binlog数据。也就是过滤掉那些:merge以后的binlog数据,它的操作时间比目标库里的数据时间旧。
filterBinlogAging()方法首先会通过batchQuery()方法对合并的数据从目标库里进行分批查询。也就是首先对合并的数据按表进行分组,分组后再进行数据切割,保证每次最多查询200条数据。
public class MergeBinlogWrite { ... //对合并后的数据进行验证是否为过时数据 public void filterBinlogAging(OperateType operateType, String domain) { //批量查询数据是否存在于目标库,并返回匹配的数据集合;也就是根据这500条数据去分库分表的目标库中进行查询 Map<String, Map<String, Object>> respMap = batchQuery(operateType, domain); ... } ... //批量查询已存在目标库的数据 private Map<String, Map<String, Object>> batchQuery(OperateType operateType, String domain) { //先获取本次迁移的全部唯一key List<String> keyStrList = new ArrayList<>(binlogDataMap.keySet()); binlogDataList = new ArrayList<>(keyStrList.size()); //先对这批数据,按照表名做一个分组 Map<String, List<String>> keyMap = new HashMap<>(); //增量处理和全量处理 if (operateType == OperateType.ADD) { //增量的数据进行分组处理(按表粒度分组) //按表为粒度来进行分组,就是一个表的多条数据分为一组 keyMap = groupIncrMentTable(keyStrList); } else if (operateType == OperateType.ALL) { //全量数据进行分组(只是模型转换的概念) keyMap = groupAllTable(keyStrList); } Map<String, Map<String, Object>> targetMap = new HashMap<>(); MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class); List<Map<String, Object>> targetAllList = new ArrayList<>(); for (Map.Entry<String, List<String>> mapEntry : keyMap.entrySet()) { String dataBaseKey = mapEntry.getKey(); String[] split = dataBaseKey.split(SPLIT_KEY); //获取topic和对应的表 String tableName = null; String topic = null; if (operateType == OperateType.ADD) { topic = split[0]; tableName = split[1]; } else { tableName = split[0]; } List<String> keyList = mapEntry.getValue(); //数据切割,每次查询200条数据 //一个表的keyList,可能会有很多条;条数太多了以后,如果去做批量查询,一次查询的量可能会太大 //所以需要对keyList做一个切割,按200条为一个单位,切割成多个批次 //分多个批次,把表对应的所有数据从目标库表里查询出来后,还需要做一些对比,即当前binlog是不是比目标库里的数据要旧 int limit = countStep(keyList.size()); //切割成多个集合对象 //java8表达式,流处理的方式,按照指定的批次,将keyList拆分为多个批次 List<List<String>> splitList = Stream.iterate(0, n -> n + 1) .limit(limit) .parallel() .map(a -> keyList.stream().skip((long) a * MAX_SEND).limit(MAX_SEND).parallel().collect(Collectors.toList())) .collect(Collectors.toList()); //获取对应的业务域滚动查询对象 RangeScroll scrollConfig = buildRangeScroll(tableName, topic, domain); //分页查询数据 for (List<String> strings : splitList) { List<Map<String, Object>> targetList = migrateService.findByIdentifiers(scrollConfig, strings, DBChannel.CHANNEL_2.getValue()); targetAllList.addAll(targetList); } String keyValue = MergeConfig.getSingleKey(tableName); for (Map<String, Object> target : targetAllList) { String mapKey = target.get(keyValue) + ""; targetMap.put(mapKey + SPLIT_KEY + tableName, target); } } return targetMap; } ... //增量数据进行分组 private Map<String, List<String>> groupIncrMentTable(List<String> keyStrList) { Map<String, List<String>> keyMap = new HashMap<>(); //筛选按表为维度的集合 for (String keyStr : keyStrList) { String[] split = keyStr.split(SPLIT_KEY); List<String> keyList; String key = split[0]; String tableName = split[1]; String topic = split[2]; if (keyMap.containsKey(topic + SPLIT_KEY + tableName)) { keyList = keyMap.get(topic + SPLIT_KEY + tableName); keyList.add(key); } else { keyList = new ArrayList<>(); keyList.add(key); } //每一个表对应的多条数据的标识,订单表 //order_topic + order_info,list<订单编号, 订单编号, 订单编号> keyMap.put(topic + SPLIT_KEY + tableName, keyList); } return keyMap; } ... }
27.对merge数据基于目标库数据做过滤
MergeBinlogWrite的filterBinlogAging()方法的过滤逻辑需要筛选出如下情况的binlog:
情况一:binlog比目标库的数据要新的,操作时间是新的,则进行update更新操作
情况二:binlog是delete删除操作,目标库也有数据,那么要进行delete删除操作
情况三:binlog在目标库中不存在且不是delete操作,则需要执行insert插入操作
public class MergeBinlogWrite { ... //对合并后的数据进行验证是否为过时数据 public void filterBinlogAging(OperateType operateType, String domain) { //批量查询数据是否存在于目标库,并返回匹配的数据集合;也就是根据这500条数据去分库分表的目标库中进行查询 Map<String, Map<String, Object>> respMap = batchQuery(operateType, domain); //开始核对数据是否已经存在库中,并验证谁的时间最新过滤失效数据 for (Map.Entry<String, BinLog> entry : binlogDataMap.entrySet()) { BinLog binLog = entry.getValue(); //当前同步要处理的表名称 String tableName = binLog.getTableName(); //判断同步的数据库中,是否在目标库中已存在 //本次拿到的binlog,如果在目标库里已经存在一条数据了 if (!CollectionUtils.isEmpty(respMap) && respMap.containsKey(entry.getKey())) { //当前同步的这条记录 Map<String, Object> binLogMap = binLog.getDataMap(); //目标库中查询到的记录 Map<String, Object> targetMap = respMap.get(entry.getKey()); //处理同步的记录是否需要执行,如果同步的时间大于目标库的时间,则代表需要更新,但删除的数据不比对时间 if (BinlogType.DELETE.getValue().equals(binLog.getOperateType())) { //第二种情况,如果当前这条binlog是delete删除操作,那么这条binlog是一定要处理的 binLog.setOperateType(BinlogType.DELETE.getValue()); } else if (MigrateCheckUtil.comparison(binLogMap, targetMap, tableName)) { //第一种情况,binlog比目标库的数据是要新的,即操作时间是更加新的,那么进行update更新操作 binLog.setOperateType(BinlogType.UPDATE.getValue()); } else { continue; } } else { //第三种情况,binlog在目标库中不存在,则说明这条binlog需要执行插入insert操作 //目标库里数据不存在,那么设置operateType=insert;如果是update,则需要手工调整为insert //如果是delete,直接返回就即可,目标库中不存在这条数据,此时删除操作就不用去做了 //数据在目标库不存在,对多条数据的最后一条结果集的类型为update,需要更正为insert,如果是delete则略过 if (BinlogType.UPDATE.getValue().equals(binLog.getOperateType())) { binLog.setOperateType(BinlogType.INSERT.getValue()); } if (BinlogType.DELETE.getValue().equals(binLog.getOperateType())) { continue; } } //将需要写入的数据添加到集合中 binlogDataList.add(binLog); } } ... }
28.将过滤后的merge数据写入目标库
在LocalQueue的doCommit()方法中,首先调用MergeBinlogWrite的mergeBinlog()方法去完成binlog数据的合并,然后调用MergeBinlogWrite的filterBinlogAging()方法进行无效binlog数据的过滤,最后调用MergeBinlogWrite的write()方法将过滤后的合并数据进行分组写入到目标库。
需要注意的是:在执行一批SQL语句的时候,update更新操作需要一条一条执行。但insert和delete操作,可以进行批量执行插入和删除。最后完成写入后,就会更新binlog消费处理记录为已完成未提交,然后清空读队列。
public class MergeBinlogWrite { ... //对数据进行写入 public void write(OperateType operateType, RangeScroll rangeScroll) { //先按表,将数据进行分组 Map<String, List<BinLog>> binLogMap = binlogDataList.stream().collect(Collectors.groupingBy(BinLog::getTableName)); boolean isWrite = true; //遍历不同写入表的集合对象 for (Map.Entry<String, List<BinLog>> mapEntry : binLogMap.entrySet()) { String tableName = mapEntry.getKey(); List<BinLog> binLogList = mapEntry.getValue(); String topic = binLogList.get(0).getTopic(); //全量的是从外部带入的参数,增量的通过表名和topic构建 if (Objects.isNull(rangeScroll)) { rangeScroll = buildRangeScroll(tableName, topic, null); } MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class); //批量写入 boolean isFlag = migrateService.migrateBat(rangeScroll, binLogList); //有一次更新失败,本批次的offset都不更新状态 if (!isFlag) { isWrite = false; } } //这里的逻辑用于处理增量同步的情形,operateType == OperateType.ALL才是全量同步 //批量更新offset的标志,如果更新过程中有一个批次是失败的,都不能更新掉本地同步的offset,待下次拉取的时候更新 if (isWrite) { if (OperateType.ADD == operateType) { updateConsumeRecordStatus(); } } else { //如果有更新失败todo 抛出异常,暂停任务,等排查出问题后继续进行 throw new BusinessException("全量数据写入失败"); } } //更新消息队列的offset标志为已完成 private void updateConsumeRecordStatus() { EtlBinlogConsumeRecordMapper etlBinlogConsumeRecordMapper = ApplicationContextUtil.getBean(EtlBinlogConsumeRecordMapper.class); Map<Integer, List<EtlBinlogConsumeRecord>> integerListMap = etlBinlogConsumeRecordList.stream().collect(Collectors.groupingBy(EtlBinlogConsumeRecord::getQueueId)); for (Map.Entry<Integer, List<EtlBinlogConsumeRecord>> mapEntry : integerListMap.entrySet()) { Integer queueId = mapEntry.getKey(); List<EtlBinlogConsumeRecord> etlBinlogConsumeRecordList = mapEntry.getValue(); //批量更新 etlBinlogConsumeRecordMapper.batchUpdateConsumeRecordStatus(queueId, ConsumerStatus.CONSUME_SUCCESS.getValue(), etlBinlogConsumeRecordList); } } ... } @Service public class MigrateServiceImpl implements MigrateService { ... @Override public boolean migrateBat(RangeScroll scroll, List<BinLog> binLogs) { log.info("开始执行migrateBat方法,tableName=" + scroll.getTableName() + ",本次操作" + binLogs.size() + "条记录"); if (!Objects.isNull(scroll) && CollUtil.isNotEmpty(binLogs)) { try { //在执行这批sql语句的时候,update更新操作是一条一条来执行 //insert和delete操作,则可以进行批量插入和删除 //insert into values()(),delete from table where id in(xx, xx) List<Map<String, Object>> insertMaps = new ArrayList<>(); List<Map<String, Object>> deleteMaps = new ArrayList<>(); for (BinLog binLog : binLogs) { if (BinlogType.INSERT.getValue().equals(binLog.getOperateType())) { //新增操作单独拎出来做批量新增,不然执行效率太低 insertMaps.add(binLog.getDataMap()); } else if (BinlogType.UPDATE.getValue().equals(binLog.getOperateType())) { //处理一下更新的null异常对象 binLog.setDataMap(MigrateUtil.updateNullValue(binLog.getDataMap())); update(binLog.getDataMap(), scroll); } else if (BinlogType.DELETE.getValue().equals(binLog.getOperateType())) { deleteMaps.add(binLog.getDataMap()); } } //批量新增 if (CollUtil.isNotEmpty(insertMaps)) { MigrateUtil.removeNullValue(insertMaps); insertBat(insertMaps, scroll); } if (CollectionUtils.isNotEmpty(deleteMaps)) { delete(deleteMaps, scroll); } } catch (Exception e) { log.error("migrateBat () tableName=" + scroll.getTableName(), e); return false; } return true; } return false; } ... }
29.offset提交线程的启动和逻辑分析
系统启动时,会针对每个增量同步任务都提交一个CanalPullCommitRunner任务到线程池。接着当offset提交线程CanalPullCommitRunner一旦启动,就会创建一个RocketMQ的Consumer,监听同样的topic,同时也关闭自动提交offset。CanalPullCommitRunner和CanalPullRunner里的Consumer,虽然topic一样但group不一样。
之后offset提交线程,会首先把topic里的queue拉取过来,这样就知道有多少queue。然后把topic里所有的queue,都分配给当前的这个Consumer,以便可以拿到所有queue,之后offset提交线程便会进入while(true)循环。
在循环中,会通过consumeRecordMapper从数据库获取所有已消费但未提交的记录。因为每一条binlog都会对应一个consumeRecord记录,所以可以把没有提交的record都查出来,接着遍历每一个consumeRecord记录。
由于每个binlog都是RocketMQ里的一条消息,它里面会包含topic、queue、offset、message。基于consumer的seek()定位,可以直接定位到Broker的那个topic -> queue -> offset的位置去。
定位到指定的位置后,做一个poll操作,从指定的位置poll拉取出来一批数据。这一步是必须的,不然手动提交的东西就不对了。
当拉取出一批消息后,就需要对这批消息,执行Commit操作。也就是对这批已经处理成功的消息,进行手动提交offset。
最后更新这个consumeRecord记录的消费记录状态为已提交,也就是把这条消息消费记录的状态修改为committed,认为它对应的binlog消息已提交成功。
@Component public class CanalConsumeTask implements ApplicationRunner { //RocketMQ的nameServer地址 @Value("${rocketmq.name-server:127.0.0.1:9876}") private String nameServerUrl; //可以从migrateConfigService拿到增量同步配置 @Autowired private MigrateConfigService migrateConfigService; //ApplicationRunner在系统启动时就会运行run()方法 @Override public void run(ApplicationArguments args) throws Exception { //首先查出当前配置好的需要滚动查询全量数据的迁移任务,每个滚动查询全量数据的迁移任务就对应一个增量同步任务 List<ScrollDomain> scrollDomainList = migrateConfigService.queryScrollDomainList(); //这里会创建一个线程池,线程数量 = 已配置好的滚动查询全量数据的迁移任务的数量 ExecutorService executors = Executors.newFixedThreadPool(scrollDomainList.size()); for (ScrollDomain scrollDomain : scrollDomainList) { //接下来会提交两个任务 if (scrollDomain.getDataSourceType().equals(1)) { //执行拉取任务,此时设置Consumer不自动提交offset,只拉取不提交 executors.execute(new CanalPullRunner(scrollDomain.getDomainTopic(), nameServerUrl)); //执行提交任务,此时才会提交offset executors.execute(new CanalPullCommitRunner(scrollDomain.getDomainTopic(), nameServerUrl)); } } } } //binlog消息拉取提交任务 public class CanalPullCommitRunner implements Runnable { //消息主题 private final String topic; //RocketMQ的NameServer地址 private final String nameServerUrl; //binlog消息同步消费记录表Mapper private final EtlBinlogConsumeRecordMapper consumeRecordMapper; //消息拉取提交任务构造方法 public CanalPullCommitRunner(String topic, String nameServerUrl) { this.topic = topic; this.nameServerUrl = nameServerUrl; this.consumeRecordMapper = ApplicationContextUtil.getBean(EtlBinlogConsumeRecordMapper.class); } @Override public void run() { try { //注意,这里Consumer的consumerGroup和CanalPullRunner是不一样的 DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("binlogCommitConsumer"); //也是关闭自动提交 litePullConsumer.setAutoCommit(false); litePullConsumer.setNamesrvAddr(nameServerUrl); litePullConsumer.start(); commitRun(litePullConsumer); } catch (MQClientException e) { log.error("消息提交失败", e); } } //执行消息提交 private void commitRun(DefaultLitePullConsumer consumer) { try { //线程一旦启动,这里首先会把topic里的queue拉取过来,这样就知道有多少queue Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues(topic); //然后把topic里所有的queue,都分配给当前的这个consumer,这样当前的consumer是可以拿到所有的queue consumer.assign(messageQueues); try { //这里进入while(true)循环,负责重试 while (true) { //通过consumeRecordMapper从数据库中获取所有已消费未提交的记录 //因为每一条binlog都会对应一个consumeRecord记录,所以这里可以把没有提交的record都查出来 List<EtlBinlogConsumeRecord> consumedRecords = consumeRecordMapper.getNotCommittedConsumedRecords(topic); if (CollUtil.isNotEmpty(consumedRecords)) { //接着遍历每一个consumeRecord记录 for (EtlBinlogConsumeRecord consumedRecord : consumedRecords) { //而每个binlog都是RocketMQ里的一条消息,它里面会包含topic、queue、offset、message //基于consumer的seek()定位,就可以直接定位到Broker的那个topic、那个queue、那个offset的位置去 consumer.seek(new MessageQueue(consumedRecord.getTopic(), consumedRecord.getBrokerName(), consumedRecord.getQueueId()), consumedRecord.getOffset()); //定位到指定的位置后,做一个poll操作,从指定的位置poll拉取出一批消息 //这一步是必须的,不然手动提交的东西就不对了 List<MessageExt> messageExts = consumer.poll(); //当拉取出一批消息后,就需要对这批消息,执行commit操作 //也就是对这批已经处理成功的消息,进行手动提交offset consumer.commitSync(); //最后更新这个consumeRecord记录的消费记录状态为已提交 //也就是把这条消息消费记录的状态,修改为committed,认为它对应的binlog消息已经提交成功了 consumedRecord.setConsumeStatus(ConsumerStatus.COMMITTED.getValue()); consumeRecordMapper.updateConsumeRecordStatus(consumedRecord); } } else { Thread.sleep(5000); } } } finally { consumer.shutdown(); } } catch (MQClientException | InterruptedException e) { try { //假设要拉取消息的主题还不存在,则会抛出异常,这种情况下休眠五秒再重试 Thread.sleep(5000); commitRun(consumer); } catch (InterruptedException interruptedException) { log.error("消息拉取服务启动失败!", e); } } } }
30.增量同步过程中binlog写入失败的恢复
(1)binlog在增量同步写入失败时无法更新消息消费记录的状态为已消费未提交
(2)增量同步线程CanalPullRunner重新消费binlog消息时的处理
在IncrementTask任务对增量同步的对binlog进行批处理的写入过程中:LocalQueue的doCommit()方法会调用MergeBinlogWrite的write()方法去分组写入合并的binlog。此时只要有一次binlog更新失败,那么这一批binlog对应的消费记录状态都不会更新。
由于这一批binlog对应的消费记录状态不会更新为已完成未提交,自然就不能在offset提交线程中获取出这一批binlog对应的消费记录出来,进行提交处理。
这样由于增量同步线程CanalPullRunner一开始就关闭了自动提交offset,所以RocketMQ会一直收不到这一批binlog消息的任何反馈和通知(也就是Commit)。在这种情况下,RocketMQ会自动把这一批消息又重新投递给增量同步线程的消费者去消费。
CanalPullRunner的消费者重新消费到这一批binlog时会发现已存在对应消息消费记录,于是就会通过processExistsRecord()方法调用LocalQueue的commit()方法进行重新写入处理。
(1)binlog在增量同步写入失败时无法更新消息消费记录的状态为已消费未提交
public class MergeBinlogWrite { ... //对数据进行写入 public void write(OperateType operateType, RangeScroll rangeScroll) { //先按表,将数据进行分组 Map<String, List<BinLog>> binLogMap = binlogDataList.stream().collect(Collectors.groupingBy(BinLog::getTableName)); boolean isWrite = true; //遍历不同写入表的集合对象 for (Map.Entry<String, List<BinLog>> mapEntry : binLogMap.entrySet()) { String tableName = mapEntry.getKey(); List<BinLog> binLogList = mapEntry.getValue(); String topic = binLogList.get(0).getTopic(); //全量的是从外部带入的参数,增量的通过表名和topic构建 if (Objects.isNull(rangeScroll)) { rangeScroll = buildRangeScroll(tableName, topic, null); } MigrateService migrateService = ApplicationContextUtil.getBean(MigrateService.class); //批量写入 boolean isFlag = migrateService.migrateBat(rangeScroll, binLogList); //有一次更新失败,本批次的offset都不更新状态 if (!isFlag) { isWrite = false; } } //这里的逻辑用于处理增量同步的情形,operateType == OperateType.ALL才是全量同步 //批量更新offset的标志,如果更新过程中有一个批次是失败的,都不能更新掉本地同步的offset,待下次拉取的时候更新 if (isWrite) { if (OperateType.ADD == operateType) { updateConsumeRecordStatus(); } } else { //如果有更新失败todo 抛出异常,暂停任务,等排查出问题后继续进行 throw new BusinessException("全量数据写入失败"); } } ... }
(2)增量同步线程CanalPullRunner重新消费binlog消息时的处理
public class CanalPullRunner implements Runnable { ... //执行消息拉取 private void pullRun() { try { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("binlogPullConsumer"); //设置RocketMQ Consumer禁止自动提交offset,让它不要自动去提交offset,防止还没完处理完binlog消息就提交offset //如果offset已经被自动提交,但是binlog消息却处理失败,那么RocketMQ就不会让消费者再次消费了 litePullConsumer.setAutoCommit(false); litePullConsumer.setNamesrvAddr(nameServerUrl); litePullConsumer.subscribe(topic, "*"); litePullConsumer.start(); try { //进入while死循环中,通过Consumer从RocketMQ中一批批的Pull消息出来消费处理 while (true) { //拉取未消费消息 List<MessageExt> messageExts = litePullConsumer.poll(); if (CollUtil.isNotEmpty(messageExts)) { for (MessageExt messageExt : messageExts) { byte[] body = messageExt.getBody(); String msg = new String(body); //记录queueId和offset int queueId = messageExt.getQueueId(); long offset = messageExt.getQueueOffset(); String topic = messageExt.getTopic(); //topic、queue、offset、msg,四位一体,把所有的信息都拿到 //判断该消息是否已被处理过,即是否已经存在于消费记录表中,如果存在则跳过执行 EtlBinlogConsumeRecord existsRecord = consumeRecordMapper.getExistsRecord(queueId, offset, topic); if (null == existsRecord) { //如果还没处理过,那么进行处理,并往消费记录表中插入一条记录 processNewMsg(messageExt, msg); } else { //处理已经存在的消费记录 processExistsRecord(litePullConsumer, msg, existsRecord); } } } else { Thread.sleep(5000); } } } finally { litePullConsumer.shutdown(); } } catch (InterruptedException | MQClientException e) { try { //假设要拉取消息的主题还不存在,则会抛出异常,这种情况下休眠五秒再重试 Thread.sleep(5000); pullRun(); } catch (InterruptedException ignored) { } } } ... //处理已经存在的消费记录 private void processExistsRecord(DefaultLitePullConsumer litePullConsumer, String msg, EtlBinlogConsumeRecord existsRecord) { //已经存在的消费记录状态为已提交,说明mq里的对应消息修改提交状态失败了 //RocketMQ源码里手动提交消息时,如果失败了只会记录日志不会抛出异常,因此这里必须再次尝试提交消息防止mq中未处理的消息和实际情况不符 try { if (ConsumerStatus.COMMITTED.getValue().equals(existsRecord.getConsumeStatus())) { litePullConsumer.seek(new MessageQueue(existsRecord.getTopic(), existsRecord.getBrokerName(), existsRecord.getQueueId()), existsRecord.getOffset()); //这一步必须,不然手动提交的东西不对 List<MessageExt> committedFaildmessageExts = litePullConsumer.poll(); //再次提交已消费的消息 litePullConsumer.commitSync(); } else { BinlogData binlogData = BinlogUtils.getBinlogDataMap(msg); if (null == binlogData) { return; } LocalQueue.getInstance().submit(binlogData, existsRecord); } } catch (Exception e) { log.error("消息重新消费失败", e); } } }
31.增量同步过程中的各种失败场景的恢复机制
场景一:增量同步线程CanalPullRunner在拉取到消息后,新增消费记录时,系统重启或宕机。此时会导致新增消费记录失败,但这并不会有什么影响,等待这批binlog消息重新消费即可。
场景二:增量同步线程CanalPullRunner在拉取到消息后,新增消费记录成功了。但对binlog进行批处理的定时任务刚开始跑时,系统重启或宕机,那么也不影响。等待这批binlog消息重新消费后,内存读队列最终还是会出现这批binlog消息。
场景三:对binlog进行批处理的定时任务在进行读写队列交换、数据合并及过滤时,系统重启或宕机。那么也不影响,等待这批binlog消息重新消费后,内存写队列会重新出现这批binlog消息,这些对binlog进行批处理的定时任务继续运行即可。
场景四:对binlog进行批处理的定时任务在对binlog数据写入目标库时,系统重启或宕机。这时这批binlog消息对应的消费记录不会被更新为已消费未提交,后续增量同步线程CanalPullRunner重新消费这批binlog消息时会进行进行重新提交处理。
场景五:offset提交线程查出一批已消费未提交的消息,在还没来得及Commit时,系统重启或宕机。那也不影响,由于提交offset后才会更新消息消费记录的状态,重新执行offset提交线程重新查即可。
场景六:offset提交线程已经提交offset,但没来得及更新消费记录状态,系统重启或宕机。那也不影响,重新执行offset提交线程重新更新消费记录状态即可。
32.定时移除已提交的增量同步消息
每隔5分钟把已经提交的消费记录进行删除和清理,避免消息消费记录表数据过多。
//定时任务移除掉已提交的增量同步消息 @Component public class RemoveBinlogConsumeTask { @Autowired private EtlBinlogConsumeRecordMapper consumeRecordMapper; @Scheduled(cron = "0 0 0 1/1 * ? ") public void removeBinlogConsumeRecordTask() { //每次删除超过当前时间5分钟的历史数据 Date updateTime = DateUtils.addMinute(new Date(), -5); consumeRecordMapper.deleteCommittedConsumedRecords(updateTime); //每隔5分钟把已经提交offset的消费记录进行删除和清理 //因为每条binlog数据都会在数据库里都有一条消费记录,这样可能会导致这个数据太多 } }
33.增量与全量并行运行的场景分析
运行时,首先会运行增量同步的任务,然后才启动全量同步任务。全量同步任务,会一批一批地查,一批一批地写。增量同步任务,会对启动后的所有增删改进行同步操作。
下面分析全量和增量一起运行时的场景和情况:此时增量里都是对全量还没同步到的数据进行增删改,而全量则是从历史第一条数据开始进行查询和同步的。
场景一:增量出现insert插入操作,全量还没同步到,此时属于对最新的数据进行插入,但增量同步已经把最新的数据insert操作写入到目标库里了,这时是没有问题的。后续全量数据同步到这条数据时,从源数据库把这条数据查询出来了,准备进行插入操作,但会发现这条数据已经在目标库中存在,此时全量同步会进行过滤,不会进行重复插入。
场景二:增量出现update更新操作,全量还没同步到这条数据,增量已拿到更新的binlog。目标库里此时还没有这条数据,增量同步的写入逻辑里,会把update操作转换为insert操作。直接提前插入这条数据,等全量要同步源数据库这条数据时,再从目标库查出来对比过滤。
场景三:增量出现delete删除操作,全量还没同步到这条数据,增量已拿到删除的binlog。增量同步对这条数据处理时,发现目标库里没有这条数据,于是会直接返回,不做处理。而当后续全量同步处理到这一条数据时,便会发现这条数据已经被删除了,于是也不同步到目标库。
场景四:全量同步已经同步过一批数据,但是这些数据又发生了删除和修改。当增量同步拿到这些改的binlog,会发现其操作时间比目标库里的数据更加新,于是会去更新。当增量同步拿到这些删的binlog,会发现目标库里有这些数据,于是会去进行删除。
场景五:全量在同步的数据,和增量发生的变更,几乎是并发同时发生。比如,全量刚查出来一批数据还没来得及写入,此时这批数据同时发生修改和删除,也就是增量和全量并发同步一批数据产生了冲突。
34.增量与全量并发同步一批数据的冲突
(1)增量同步拿到更新binlog的场景
(2)增量同步拿到删除binlog的场景
全量查询出来一批数据还没来得及落库,增量也收到了这批数据里最新的修改和删除的binlog。这个最新的修改和删除,是全量同步在全量查询出来后,才对源数据库的数据发起的。
全量同步查出来一批数据还没落库,增量同步接着拿到最新的更新和删除,产生了冲突。下面分析是增量同步先落库,还是全量同步先落库。
(1)增量同步拿到更新binlog的场景
场景一:如果增量同步先落库,那么增量同步会把更新转插入
当后面全量同步再来尝试插入时,全量同步去目标库进行查询查到有数据,会发现全量同步准备插入的数据比较旧,于是就不会进行插入了。
当后面全量同步再来尝试插入时,全量同步去目标库进行查询发现还没有数据,但查完后准备落库插入数据时,增量同步已经把更新转插入写入目标库里先落库了。这其实没有关系的,因为此时全量同步是发起插入的,这会导致唯一键冲突而插入失败。
场景二:如果全量同步先落库,增量同步再来的更新正常进行更新
那么此时会正常运行更新,增量同步会把数据更新为最新状态。
场景三:如果全量同步先落库,增量同步再来的更新转换为插入
这发生于全量同步正准备落库时,增量同步拿到更新binlog,查询发现目标库没有数据。于是增量同步就把更新转插入,然后准备落库。但是全量同步此时先落库进行插入了,于是导致增量同步的更新转插入操作发生唯一键冲突,从而导致这次增量同步失败。但即便这样也不会有问题,因为这次增量同步失败,那么就不会更新消息消费记录的状态为已消费未提交。后续这条更新的binlog会重新被消费到,从而走回正常的增量同步下的更新操作。
(2)增量同步拿到删除binlog的场景
场景一:如果全量同步先落库,增量同步再来进行删除,此时是不影响的。
场景二:如果增量同步先落库进行删除,发现目标库本来就没有这条数据,跑空了。然后接着全量同步再来落库,把数据插入进目标库,则后续再也不会对这条数据删除了。从而导致目标库的数据比源数据库的数据多了。
综上所述,其实只有一个增量同步空删除的问题,解决方案如下:在增量同步对删除的binlog进行落库时,如果发现目标库是空的,此时可能是全量同步还没把数据插入进来,而增量同步先执行删除了。为了避免增量同步先删除的问题,可以把这条删除的binlog重新投递到topic里,并设置为延迟消息进行延迟一定的时间后再消费,如延迟10分钟、30分钟。等这条重新投递的binlog消息被消费到的时候,全量同步已经把它对应的数据插入了。这时再让增量同步执行删除操作,就不会发生空删除的问题了。
35.全量同步完成后的数据校验逻辑分析
(1)校验的整体思路
(2)校验的定时任务
(1)校验的整体思路
方法一:按检查类型分,可以分为数据顺序检查、随机数据检查。
方法二:直接源数据库和目标库分别检查数据总量和最大主键值,如select max(id)看看能否对上。
(2)校验的定时任务
会有一个定时任务CheckDataTask每隔120秒跑一次,负责校验数据。并且同一时刻只能有一个这样的定时任务在跑,所以任务一开始运行就会加锁。
首先会查询出已经全量同步完成的滚动查询的迁移明细记录,并且一次最多查100条这种记录。然后根据遍历这些记录封装成一个一个的滚动查询任务,再一批一批发起对数据的校验。
数据校验的逻辑如下:
步骤一:先从源数据库按照分页来获取一批数据
步骤二:再从目标数据库也获取一批数据
步骤三:接着对数据进行核对校验,然后找出核对不一致的数据
步骤四:然后去目标库分别处理那些需要更新和新增的不一致的数据
步骤五:最后更新迁移记录的类型为核对类型以及完成核对的数据量、更新迁移明细记录
一般来说,不能让新版本和老版本两个系统并行来跑,需要停机发布。而且新版本系统启动时,不能去RPC服务的注册中心注册,避免流量直接分发到新版本系统。一般会先让老系统下线,再手动上线新系统,在很短的时间内把这两个动作做完。
如果新版本系统发布时自动注册到注册中心,就会有一部分流量去读写多库多表。此时由于老版本系统还没完全下线,还会有一部分流量去读写单库单表。虽然读写单库单表的binlog最终会同步到多库多表(最终一致),但是反过来就不是了。即增删改操作刚在多库多表完成,然后去单库单表进行读取读不到,出现数据不一致的问题。
如果一定要不停机发布,可以发布一个带有开关的新系统,开关默认是关闭的,而且新系统带有两种DAO模型。当开关关闭时,会通过DAO模型一往单库单表进行读写数据。当开关打开时,会通过DAO模型二往多库多表进行读写数据。所以,开关控制的是应用要操作那种类型的DAO模型。
如果要在线上执行一些多库多表下的DDL操作:一般都会基于ShardingSphere开发一个数据库运维管理工作台,然后这个工作台会基于ShardingSphere把DDL命令路由到各个库和表里去。
需要注意:源数据库的地址此时使用ShardingSphere配置为8库8表,目标库的地址此时使用ShardingSphere配置为16库16表。增量同步前,Canal需要监听8台数据库服务器,把8个库的数据增删改binlog写到MQ里。