Seata AT模式如何实现行锁详解
作者:梦想实现家_Z
前言
我们在很多博客中都有发现,Seata AT模式里面的全局锁其实是行锁,这也是Seata AT模式和XA模式在锁粒度上的最大区别。我们可以在官网看到这样一个例子:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
那么你知道Seata AT模式是如何实现行锁的嘛?为了搞明白AT模式到底是怎么获取全局锁的,我们深入源码来看看。
如何加锁
为了证实全局锁就是我们所说的行锁,经过一番寻找,我在BaseTransactionalExecutor
类中的prepareUndoLog()
方法中找到了这样一段代码:
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; String lockKeys = buildLockKey(lockKeyRecords); if (null != lockKeys) { connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); }
- 如果是删除的SQL,那么通过
beforeImage
生成行锁标记,否则通过afterImage
生成行锁标记;
比如表名wallet_tbl
,里面有一个主键id
值为1,那么最终生成的lockKeys
为wallet_tbl:1
,如果有多行记录id
值分别为1、2、3,那么最终生成的lockKeys
为wallet_tbl:1,2,3
;多个主键索引的话使用_
连接。所以我们可以总结出lockKeys
的生成规则为:tableName:1_A,2_B,3_C
,1
、2
、3
、A
、B
、C
分别为主键索引的值。
此时还没有真正地拿到锁,只是生成一个锁的标记。真正地上锁需要查看ConnectionProxy.register()
方法:
private void register() throws TransactionException { if (!context.hasUndoLog() || !context.hasLockKey()) { return; } Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.getApplicationData(), context.buildLockKeys()); context.setBranchId(branchId); }
branchRegister()
方法就是RM
向TC
进行分支注册,同时会申请行锁。那么获取行锁的核心代码应该就是在TC
端了,我们顺着branchRegister()
逻辑一路找到BranchSession.lock()
:
public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException { if (this.getBranchType().equals(BranchType.AT)) { // 只有AT模式需要获取行锁 return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock); } return true; }
下面就要真正地开始进入LockerManager
来申请锁了:
@Override public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException { if (branchSession == null) { throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); } String lockKey = branchSession.getLockKey(); if (StringUtils.isNullOrEmpty(lockKey)) { // no lock return true; } // get locks of branch // 将lockKey解析成多行RowLock List<RowLock> locks = collectRowLocks(branchSession); if (CollectionUtils.isEmpty(locks)) { // no lock return true; } return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock); }
这里做了一步将lockKey解析成多行RowLock,根据上面的tableName:1_A,2_B,3_C
规则,最终解析成3个RowLock
对象:{tableName,1_A},{tableName,2_B},{tableName,3_C}
最终我们追踪到最后一个关键方法LockStoreDataBaseDAO.acquireLock()
:
@Override public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) { Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; Set<String> dbExistedRowKeys = new HashSet<>(); boolean originalAutoCommit = true; // 如果有多行锁,那么先去重 if (lockDOs.size() > 1) { lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList()); } try { conn = lockStoreDataSource.getConnection(); if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } List<LockDO> unrepeatedLockDOs = lockDOs; //check lock if (!skipCheckLock) { boolean canLock = true; // 查询是否已经存在行锁 // "select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc" // in里面最多限制1000个 String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size()); ps = conn.prepareStatement(checkLockSQL); for (int i = 0; i < lockDOs.size(); i++) { ps.setString(i + 1, lockDOs.get(i).getRowKey()); } rs = ps.executeQuery(); String currentXID = lockDOs.get(0).getXid(); boolean failFast = false; while (rs.next()) { String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID); // 如果发现有其他分布式事务和当前申请行锁的数据一致,那么加锁失败 if (!StringUtils.equals(dbXID, currentXID)) { if (LOGGER.isInfoEnabled()) { String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK); String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME); long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID); LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId); } if (!autoCommit) { int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS); if (status == LockStatus.Rollbacking.getCode()) { failFast = true; } } // 加锁失败 canLock = false; break; } dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY)); } // 加锁失败,回滚抛异常 if (!canLock) { conn.rollback(); if (failFast) { throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast)); } return false; } // 如果是同一个分布式事务中申请行锁,那么剔除重复的锁数据 if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) { unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())) .collect(Collectors.toList()); } // 如果剔除后不需要再补充行锁,那么直接返回申请成功 if (CollectionUtils.isEmpty(unrepeatedLockDOs)) { conn.rollback(); return true; } } // 申请行锁,分1行和多行两种情况 if (unrepeatedLockDOs.size() == 1) { LockDO lockDO = unrepeatedLockDOs.get(0); if (!doAcquireLock(conn, lockDO)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk()); } conn.rollback(); return false; } } else { if (!doAcquireLocks(conn, unrepeatedLockDOs)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(), unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList())); } conn.rollback(); return false; } } conn.commit(); return true; } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(rs, ps); if (conn != null) { try { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } catch (SQLException e) { } } } }
1.先通过查询语句检查是否存在锁冲突,锁冲突的话,就直接失败抛异常;
2.不存在锁冲突,检查是否锁重入,重入的话,补充行锁;
3.添加行锁;
检查锁冲突的SQL语句如下:
select row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified from lock_table where row_key in (?, ?, ?, ?) order by status desc
添加行锁SQL语句如下:
insert into lock_table (row_key, xid, transaction_id, branch_id, reource_id, table_name, pk, status, gmt_create, gmt_modified) values (?, ?, ?, ?, ?, ?, ?, now(), now(), ?)
为什么是行锁
根据上面加锁的逻辑,我们发现一直比较的都是row_key
这个主键,那么为什么row_key
代表的是行锁呢?这个问题就要回到row_key
是如何产生的:
protected LockDO convertToLockDO(RowLock rowLock) { LockDO lockDO = new LockDO(); lockDO.setBranchId(rowLock.getBranchId()); lockDO.setPk(rowLock.getPk()); lockDO.setResourceId(rowLock.getResourceId()); // row_key的生成 lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk())); lockDO.setXid(rowLock.getXid()); lockDO.setTransactionId(rowLock.getTransactionId()); lockDO.setTableName(rowLock.getTableName()); return lockDO; }
根据上面代码,我们很清楚地了解到,row_key
是由resource_id
、tableName
、pk
这三个字段连接生成的,也就意味着row_key
是代表表里面的具体一行数据,也就是我们的行记录,所以我们确信AT
模式的全局锁其实就是行锁。
以上就是Seata AT模式如何实现行锁详解的详细内容,更多关于Seata AT模式实现行锁的资料请关注脚本之家其它相关文章!