为了突破AT事务对业务编程语言的限制,现在业界正在往DB Mesh的方向发展,通过将事务中间件部署在SideCar的方式,达到任何编程语言都能使用分布式事务中间件的效果。
最新版DBPack不仅支持预处理的sql语句,还支持text类型的sql。DBPack最新版还兼容了php8的pdo_mysql扩展。Mysql 客户端在给用户发送 sql 执行结果时,如果执行没有异常,发送的第一个包为 OKPacket,该包中有一个标志位可以标识 sql 请求是否在一个事务中。如下图所示
07 00 00 // 前 3 个字节表示 payload 的长度为 7 个字节 01 // sequence 响应的序号,前 4 个字节一起构成了 OKPacket 的 header 00 // 标识 payload 为 OKPacket 00 // affected row 00 // last insert id 03 00 // 状态标志位 00 00 // warning 数量
dbpack 之前的版本将标志位设置为 0,java、golang、.net core、php 8.0 之前的 mysql driver 都能正确协调事务,php 8.0 的 pdo driver 会对标志位进行校验,所以 php 8.0 以上版本在使用 dbpack 协调分布式事务时,会抛出 transaction not active
- 客户端向聚合层服务的DBPack代理发起HTTP请求。
- DBPack生成全局唯一的XID,存储到ETCD中。注意请求的地址和端口指向DBPack,并不直接指向实际API。
- 如果开启全局事务成功(如果失败则直接结束事务),聚合层服务就可以通过HTTP header(X-Dbpack-Xid)拿到XID了。此时,聚合服务调用服务1并传递XID。
- 服务1拿到XID,通过DBPack代理,注册分支事务(生成BranchID等信息,并存储到ETCD)。
- 服务1的分支事务注册成功后,生成本地事务的回滚镜像,随着本地事务一起commit。
- 服务2进行与服务1相同的步骤4和5。
- 聚合层服务根据服务1和服务2的结果,决议是全局事务提交还是回滚,如果是成功,则返回HTTP 200给DBPack(除200以外的状态码都会被DBPack认为是失败)。DBPack更新ETCD中的全局事务状态为全局提交中或回滚中。
- 服务1和服务2的DBPack,通过ETCD的watch机制,得知本地的分支事务是该提交还是回滚(如果是提交,则删除回滚日志;如果是回滚,则执行通过回滚日志回滚到事务前镜像)。
- 所有的分支事务提交或回滚完成后,聚合层服务的DBPack的协程会检测到事务已经完成,将从ETCD删除XID和BranchID等事务信息。
- 业务数据库为mysql数据库
- 业务数据表为innodb类型
- 业务数据表必须有主键
Step0: 安装ETCD
ETCD_VER=v3.5.3 # choose either URL GOOGLE_URL=https://storage.googleapis.com/etcd GITHUB_URL=https://github.com/etcd-io/etcd/releases/download DOWNLOAD_URL=${GOOGLE_URL} rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1 rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz /tmp/etcd-download-test/etcd --version /tmp/etcd-download-test/etcdctl version /tmp/etcd-download-test/etcdutl version
Step1: 在业务数据库中创建undo_log表
-- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), KEY `idx_unionkey` (`xid`,`branch_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Step2: 编写配置文件,对接DBPack
- aggregation-service指的是微服务的聚合层,负责调用各业务接口。business-service指的是具体的业务接口。
- 完整的aggregation-service配置可以参考sample里的config-aggregation配置。
- 完整的business-service配置可以参考sample里的config-order配置
# 更新distributed_transaction.etcd_config.endpoints # 更新listeners配置项,调整为实际聚合层服务的地址和端口 # 更新filters配置项,配置聚合层服务的API endpoint vim /path/to/your/aggregation-service/config-aggregation.yaml # 更新distributed_transaction.etcd_config.endpoints # 更新listeners配置项,配置业务数据库信息,包括dbpack代理的端口 # 更新data_source_cluster.dsn vim /path/to/your/business-service/config-service.yaml
Step3: 运行DBPack
git clone git@github.com:cectc/dbpack.git cd dbpack # build on local env make build-local # build on production env make build ./dist/dbpack start --config /path/to/your/config-aggregation.yaml ./dist/dbpack start --config /path/to/your/config-service.yaml
Step4: 配置vhost,监听php项目端口
server { listen 3001; # 暴露的服务端口 index index.php index.html; root /var/www/code/; # 业务代码根目录 location / { try_files $uri /index.php?$args; } location ~ .php$ { fastcgi_split_path_info ^(.+.php)(/.+)$; fastcgi_pass order-svc-app:9000; # php-fpm 端口 fastcgi_index index.php; include fastcgi_params; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; fastcgi_param PATH_INFO $fastcgi_path_info; } }
Step5: 编写应用程序
aggregation service example
<?php class AggregationSvc { public function CreateSo(string $xid, bool $rollback): bool { $createSoSuccess = $this->createSoRequest($xid); if (!$createSoSuccess) { return false; } $allocateInventorySuccess = $this->allocateInventoryRequest($xid); if (!$allocateInventorySuccess) { return false; } if ($rollback) { return false; } return true; } // private function createSoRequest(string $xid) ... // private function allocateInventoryRequest(string $xid) ... } $reqPath = strtok($_SERVER["REQUEST_URI"], '?'); $reaHeaders = getallheaders(); $xid = $reaHeaders['X-Dbpack-Xid'] ?? ''; if (empty($xid)) { die('xid is not provided!'); } $aggregationSvc = new AggregationSvc(); if ($_SERVER['REQUEST_METHOD'] === 'POST') { switch ($reqPath) { case '/v1/order/create': if ($aggregationSvc->CreateOrder($xid, false)) { responseOK(); } else { responseError(); } case '/v1/order/create2': if ($aggregationSvc->CreateSo($xid, true)) { responseOK(); } else { responseError(); } break; default: die('api not found'); } } function responseOK() { http_response_code(200); echo json_encode([ 'success' => true, 'message' => 'success', ]); } function responseError() { http_response_code(400); echo json_encode([ 'success' => false, 'message' => 'fail', ]); }
order service example
<?php class OrderDB { private PDO $_connection; private static OrderDB $_instance; private string $_host = 'dbpack-order'; private int $_port = 13308; private string $_username = 'dksl'; private string $_password = '123456'; private string $_database = 'order'; const insertSoMaster = "INSERT /*+ XID('%s') */ INTO order.so_master (sysno, so_id, buyer_user_sysno, seller_company_code, receive_division_sysno, receive_address, receive_zip, receive_contact, receive_contact_phone, stock_sysno, payment_type, so_amt, status, order_date, appid, memo) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,now(),?,?)"; const insertSoItem = "INSERT /*+ XID('%s') */ INTO order.so_item(sysno, so_sysno, product_sysno, product_name, cost_price, original_price, deal_price, quantity) VALUES (?,?,?,?,?,?,?,?)"; public static function getInstance(): OrderDB { if (empty(self::$_instance)) { self::$_instance = new self(); } return self::$_instance; } private function __construct() { try { $this->_connection = new PDO( "mysql:host=$this->_host;port=$this->_port;dbname=$this->_database;charset=utf8", $this->_username, $this->_password, [ PDO::ATTR_PERSISTENT => true, PDO::ATTR_EMULATE_PREPARES => false, // to let DBPack handle prepread sql ] ); } catch (PDOException $e) { die($e->getMessage()); } } private function __clone() { } public function getConnection(): PDO { return $this->_connection; } public function createSo(string $xid, array $soMasters): bool { $this->getConnection()->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); try { $this->getConnection()->beginTransaction(); foreach ($soMasters as $master) { if (!$this->insertSo($xid, $master)) { throw new PDOException("failed to insert soMaster"); } } $this->getConnection()->commit(); } catch (PDOException $e) { $this->getConnection()->rollBack(); return false; } return true; } private function insertSo(string $xid, array $soMaster): bool { // insert into so_master, so_item ... } } $reqPath = strtok($_SERVER["REQUEST_URI"], '?'); $reqHeaders = getallheaders(); $xid = $reqHeaders['Xid'] ?? ''; if (empty($xid)) { die('xid is not provided!'); } if ($_SERVER['REQUEST_METHOD'] === 'POST') { if ($reqPath === '/createSo') { $reqBody = file_get_contents('php://input'); $soMasters = json_decode($reqBody, true); $orderDB = OrderDB::getInstance(); $result = $orderDB->createSo($xid, $soMasters); if ($result) { responseOK(); } else { responseError(); } } } function responseOK() { http_response_code(200); echo json_encode([ 'success' => true, 'message' => 'success', ]); } function responseError() { http_response_code(400); echo json_encode([ 'success' => false, 'message' => 'fail', ]); }
Step6: 访问聚合层业务接口
curl -X{HTTP Method} http://localhost:{DBPack监听的聚合层服务端口}/{聚合层服务的API endpoint}
- 无论是使用mysqli驱动、pdo_mysql驱动,还是通过
连接数据库(<=php5.4),在start transaction;
开始之后,后续的业务操作必须在同一个数据库连接上进行。 - DBPack通过xid(全局事务唯一ID)在事务上下文中传播,业务数据库执行的业务SQL语句中,需要加入xid注释,这样DBPack才能根据xid处理对应的事务。例如
insert /*+ XID('%s') */ into xx ...;
- dbpack: https://github.com/CECTC/dbpack
- dbpack-samples: https://github.com/CECTC/dbpack-samples
- dbpack-doc: https://github.com/CECTC/dbpack-doc
- Seata-gplang: https://github.com/opentrx/seata-golang
- Seata: https://github.com/seata/seata
- AT事务模型设计:https://seata.io/zh-cn/blog/seata-at-mode-design.html
卜贺贺。就职于日本楽天Rakuten CNTD,任Application Engineer,熟悉AT事务、Seata-golang和DBPack。GitHub:https://github.com/bohehe