canal-1.1.5实时同步MySQL数据到Elasticsearch

一、环境准备

1、jkd 8+

2、mysql 5.7+

3、Elasticsearch 7+

4、kibana 7+

5、canal.adapter 1.1.5 

二、部署

一、创建数据库CanalDb和表UserInfo

SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0;  -- ---------------------------- -- Table structure for UserInfo -- ---------------------------- DROP TABLE IF EXISTS `UserInfo`; CREATE TABLE `UserInfo`  (   `id` int(11) NOT NULL AUTO_INCREMENT,   `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,   `phone` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,   `age` int(11) DEFAULT NULL,   PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;  SET FOREIGN_KEY_CHECKS = 1;

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

 

二、kibana创建索引

PUT canal_product  {   "mappings": {     "properties": {       "user_name": {         "type": "text"       },       "phone": {         "type": "text"       },       "age": {         "type": "integer"       }     }   } }

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

 

 

 

 

 

三、下载安装canal.adapter

github:https://github.com/alibaba/canal/releases/tag/canal-1.1.5

额外需要下载v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)

分别解压缩后,将v1.1.5-alpha-2解压缩文件夹下plugin文件夹中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替换掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar,并重命名,再将该jar赋予权限 chmod 777 client-adapter.es7x-1.1.5-jar-with-dependencies.jar

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

 

1、解压并修改配置文件 conf/application.yml

只需要修改特定的几处即可,关于各节点说明可参考官方说明:https://help.aliyun.com/document_detail/135297.html

 

srcDataSources:     defaultDS:       url: jdbc:mysql://127.0.0.1:3306/CanalDb?useUnicode=true&characterEncoding=utf-8&useSSL=false       username: canal       password: canal

 

- name: es7         hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode ,127.0.0.1:9003 for transport mode          properties:           mode: rest #transport  or rest            security.auth: es:22222 #  only used for rest mode           cluster.name: elasticsearch  # es集群节点名称

 

 

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

 2、启动服务

# 启动服务 ./bin/startup.sh

3、查看日志是否启动成功

cat logs/adapter/adapter.log

如图所示

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 4、实时同步

向数据库中插入一条数据

INSERT INTO `CanalDb`.`UserInfo`( `user_name`, `phone`, `age`) VALUES ('张三', '10086', 99);

查看日志

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

kibana查看索引数据

GET canal_product/_search

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

5、全量同步,修改conf/es7/mytest_user.yml配置文件,或者新建一个yml文件也可

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值 destination: example  # canal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping:   _index: canal_product # es 的索引名称   _id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配   sql: "SELECT          p.id as _id,          p.user_name,          p.phone,          p.age         FROM          UserInfo p "        # sql映射   etlCondition: "where p.id>={}"   #etl的条件参数   commitBatch: 3000   # 提交批大小

 

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

canal-1.1.5实时同步MySQL数据到Elasticsearch

 

 

 

 

 学习链接:https://help.aliyun.com/document_detail/135297.html

                   https://blog.csdn.net/zh1998wx/article/details/123101442?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-123101442-blog-125808233.pc_relevant_aa&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-123101442-blog-125808233.pc_relevant_aa&utm_relevant_index=1

 

发表评论

相关文章