大纲
1.数据库设计
2.枚举类
3.接⼝设计
4.定时任务设计
(1)定时核对校验数据的定时任务
(2)数据量统计定时任务
(3)增量数据落地定时任务
(4)失败重试定时任务
5.技术亮点
(1)滚动拉取方案
(2)巧妙的统计滚动进度方案
(3)防止增量同步数据丢失和高效写入方案
(4)数据扩容方案
6.全量同步和增量同步整体流程图
7.功能升级
(1)数据迁移系统数据源动态化配置
(2)迁移数据库操作对应的xml动态⽣成
(3)扩容迁移数据时的问题
1.数据库设计
(1)订单表——order_info
create table order_info ( id bigint(32) auto_increment, order_no varchar(32) not null comment '订单号', order_amount decimal(8, 2) not null comment '订单⾦额', merchant_id bigint(32) not null comment '商户ID', user_id bigint(32) not null comment '⽤户ID', order_freight decimal(8, 2) default 0.00 not null comment '运费', order_status tinyint(3) default 0 not null comment '订单状态:10待付款,20待接单,30已接单,40配送中,50已完成,55部分退款,60全部退款,70取消订单', trans_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '交易时间', pay_status tinyint(3) default 2 not null comment '⽀付状态:1待⽀付、2⽀付成功、3⽀付失败', recharge_time timestamp default CURRENT_TIMESTAMP not null comment '⽀付完成时间', pay_amount decimal(8, 2) default 0.00 not null comment '实际⽀付⾦额', pay_discount_amount decimal(8, 2) default 0.00 not null comment '⽀付优惠⾦额', address_id bigint(32) not null comment '收货地址ID', delivery_type tinyint(3) default 2 not null comment '配送⽅式:1⾃提,2配送', delivery_status tinyint(3) default 0 null comment '配送状态:0 配送中,2已送达,3待收货,4已送达', delivery_expect_time timestamp null comment '配送预计送达时间', delivery_complete_time timestamp null comment '配送送达时间', delivery_amount decimal(8, 2) default 0.00 not null comment '配送运费', coupon_id bigint(32) null comment '优惠券id', cancel_time timestamp null comment '订单取消时间', confirm_time timestamp null comment '订单确认时间', remark varchar(512) null comment '订单备注留⾔', create_user bigint(32) null comment '创建⽤户', update_user bigint(32) null comment '更新⽤户', create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间', update_time timestamp null on update CURRENT_TIMESTAMP comment '更新时间', delete_flag tinyint default 0 not null comment '逻辑删除标记', primary key (id, order_no) ) comment '订单表'; create index inx_order_no on order_info (order_no); create index inx_create_time on order_info (create_time, order_no);
(2)订单详情表——order_info
-- auto-generated definition create table order_item_detail ( id bigint(32) auto_increment primary key, order_no varchar(32) not null comment '订单号', product_id bigint(32) not null comment '商品ID', category_id bigint(32) not null comment '商品分类ID', goods_num int(8) default 1 not null comment '商品购买数量', goods_price decimal(8, 2) default 0.00 not null comment '商品单价', goods_amount decimal(8, 2) default 0.00 not null comment '商品总价', product_name varchar(64) null comment '商品名', discount_amount decimal(8, 2) default 0.00 not null comment '商品优惠⾦额', discount_id bigint(32) null comment '参与活动ID', product_picture_url varchar(128) null comment '商品图⽚', create_user bigint(32) null comment '创建⽤户', update_user bigint(32) null comment '更新⽤户', create_time timestamp default CURRENT_TIMESTAMP not null comment '创建时间', update_time timestamp default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间', delete_flag tinyint default 0 not null comment '逻辑删除标记' ) comment '订单明细表' charset = utf8; create index inx_create_time on order_item_detail (create_time, order_no); create index inx_item_order_no on order_item_detail (order_no);
(3)binlog消息消费记录表——etl_binlog_consume_record
create table etl_binlog_consume_record ( id bigint auto_increment comment '主键' primary key, queue_id int null comment '消息队列id(即:queueId)', offset bigint null comment '消息偏移量(唯⼀定位该消息在队列中的位置)', topic varchar(500) null comment '消息所属主题', broker_name varchar(255) null comment '消息所在broker名称', consume_status tinyint(1) null comment '消费状态:0未消费,1消费成功,2已提交', create_time datetime null comment '记录创建时间', update_time datetime null comment '记录更新时间', constraint queue_id unique (queue_id, offset) ) comment 'binlog消息消费记录表' charset = utf8mb4;
(4)迁移明细表——etl_dirty_record
create table etl_dirty_record ( id bigint auto_increment comment '主键' primary key, logic_model varchar(255) not null comment '逻辑模型名(逻辑表或模型名称)', ticket varchar(32) not null comment '迁移批次', cur_ticket_stage int(10) not null comment '当前所属批次阶段号', record_key varchar(60) not null comment '字段名', record_value varchar(128) null comment '字段值', status int(12) null comment '迁移状态', error_msg varchar(500) null comment '错误消息', retry_times int(12) null comment '已重试次数', last_retry_time datetime null comment '上次重试时间', is_deleted tinyint(1) default 0 null comment '0未被删除,1已删除', create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间', update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间', sync_size int default 0 null comment '每次同步数量' ) comment '迁移明细表' charset = utf8mb4;
(5)迁移表——etl_progress
create table etl_progress ( id bigint auto_increment comment '主键' primary key, logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)', ticket varchar(32) null comment '迁移批次', cur_ticket_stage int(10) null comment '当前所属批次阶段号', progress_type int(10) null comment '进度类型(0滚动查询数据,1核对查询数据)', status int(12) null comment '迁移状态:1同步中,2同步完成,3同步失败', retry_times int default 0 null comment '已同步次数', finish_record bigint default 0 null comment '已完成记录数', scroll_id varchar(100) default '0' null comment '记录上⼀次滚动最后记录字段值', scroll_time datetime null comment '开始滚动时间', scroll_end_time datetime null comment '滚动截⽌时间', is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除', create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间', update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间' ) comment '迁移表' charset = utf8mb4;
(6)迁移配置表——etl_progress_config
create table etl_progress_config ( id bigint auto_increment comment '主键' primary key, logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)', record_key varchar(32) null comment '迁移批次模型字段名称', record_type int(10) null comment '迁移字段匹配类型(0唯⼀字段,1查询匹配字段)', is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除', create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间', update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间' ) comment '迁移配置表' charset = utf8mb4;
(7)需要迁移的表的数据量统计表——etl_statistical
create table etl_statistical ( id bigint auto_increment comment '主键' primary key, logic_model varchar(255) null comment '逻辑模型名(逻辑表或模型名称)', statistical_count bigint null comment '统计数据量', statistical_time int(8) null comment '统计时间(按天为单位)', is_deleted tinyint(1) default 0 null comment '0:未被删除,1:已删除', create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间', update_time datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间' ) comment '需要迁移的表的数据量统计表' charset = utf8mb4; create index inx_logic_model on etl_statistical (logic_model); create index inx_statistical_time on etl_statistical (statistical_time);
2.枚举类
(1)增量数据同步的操作类型——BinlogType
public enum BinlogType { INSERT ("新增", "INSERT"), UPDATE ("修改", "UPDATE"), DELETE ("删除", "DELETE"); }
(2)增量同步消费操作结果——ConsumerStatus
public enum ConsumerStatus { NOT_CONSUME ("未消费", 0), CONSUME_SUCCESS ("消费成功", 1), COMMITTED("已提交", 2); }
(3)DB数据库渠道——DBChannel
public enum DBChannel { //渠道⼀ CHANNEL_1 ("历史数据库", "1"), //渠道⼆ CHANNEL_2 ("新的数据库", "2"); }
(4)操作结果枚举值——OperateResult
public enum OperateResult { FAILED ("失败", "1"), SUCCESS ("成功", "0"); }
(5)操作类型——OperateType
public enum OperateType { ADD ("增量", 1), ALL ("全量", 2); }
(6)滚动类型——ProgressType
public enum ProgressType { RANGE_SCROLL("滚动查询数据", 0), CHECK_DATA ("核对查询数据", 1); }
3.接⼝设计
(1)访问迁移看板界⾯
http://localhost:8080/migrate/toIndex
(2)查询同步进度接⼝
//取得迁移进度信息 //@param queryCondition 查询条件 @RequestMapping(value = "/getEtlProgresses", method = RequestMethod.POST) public Map<String, Object> getEtlProgresses(@RequestBody EtlProgressReq queryCondition) { Map<String, Object> resultMap = new HashMap<>(); resultMap.put("resultCode", OperateResult.SUCCESS.getValue()); resultMap.put("resultMsg", OperateResult.SUCCESS.getName()); EtlProgress etlProgress = new EtlProgress(); BeanUtils.copyProperties(queryCondition, etlProgress); List<EtlProgress> resultList = migrateService.getEtlProgresses(etlProgress); resultMap.put("resultList", resultList); return resultMap; }
(3)发起全量同步接⼝
需要同步的表和时间段
//新增全量同步 将前端传过来的世界格式化 //@param rangeScroll 全量同步条件 //@return 保存结果 @RequestMapping(value = "/addScroll", method = RequestMethod.POST) public Map<String, Object> addScroll(@RequestBody RangeScroll rangeScroll) { rangeScroll.setStartTime(DateUtils.getStartTimeOfDate(rangeScroll.getStartTime())); rangeScroll.setEndTime(DateUtils.getDayEndTime(rangeScroll.getEndTime())); Map<String, Object> resultMap = new HashMap<>(); resultMap.put("resultCode", OperateResult.SUCCESS.getValue()); resultMap.put("resultMsg", OperateResult.SUCCESS.getName()); scrollProcessor.scroll(rangeScroll); return resultMap; }
4.定时任务设计
(1)定时核对校验数据的定时任务
代码入口:CheckDataTask
public void CheckData() { log.info("数据核对校验开始"); if (lock.tryLock()) { try { CheckDataProcessor checkDataProcessor = CheckDataProcessor.getInstance(); //查询已同步完成的批次,未核对的数据进⾏核对处理 List<RangeScroll> rangeScrollList = checkDataProcessor.queryCheckDataList(); for (RangeScroll rangeScroll:rangeScrollList) {// 已经在核对,本次跳过 checkDataProcessor.checkData(rangeScroll); } } catch (Exception e) { log.error("数据核对过程中发⽣异常 {}", e.getMessage(), e); } finally { log.info("数据核对校验结束"); lock.unlock(); } } }
流程图: