java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot事务读写分离实例

Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例

作者:sunct

这篇文章主要介绍了Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例,读写分离,基本的原理是让主数据库处理事务性增、改、删操作,而从数据库处理SELECT查询操作,数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库,需要的朋友可以参考下

什么是读写分离?

读写分离,基本的原理是让主数据库处理事务性增、改、删操作( INSERT、UPDATE、 DELETE) ,而从数据库处理SELECT查询操作。

数据库复制被用来把事务性操作导致的变更同步到集群中的从数据库。

为什么要读写分离呢?

源码

先定义数据源读写类型

/**
 * 数据源类型
 *
 * @author sunchangtan
 */
public enum DataSourceType {
    WRITE, READ
}

定义数据库连接的Holder

import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import org.springframework.core.NamedThreadLocal;
/**
 * 数据库连接的Holder
 *
 * @author sunchangtan
 */
public class ConnectionHolder {
    /**
     * 当前数据库链接是读还是写
     */
    public final static ThreadLocal<DataSourceType> CURRENT_CONNECTION = new NamedThreadLocal<DataSourceType>("routingdatasource's key") {
        protected DataSourceType initialValue() {
            return DataSourceType.WRITE;
        }
    };
    /**
     * 当前线程所有数据库链接
     */
    public final static ThreadLocal<Map<DataSourceType, Connection>> CONNECTION_CONTEXT = new NamedThreadLocal<Map<DataSourceType, Connection>>("connection map") {
        protected Map<DataSourceType, Connection> initialValue() {
            return new HashMap<>();
        }
    };
    /**
     * 强制写数据源
     */
    public final static ThreadLocal<Boolean> FORCE_WRITE = new NamedThreadLocal<Boolean>("FORCE_WRITE");
}

定义数据源的Holder

import org.springframework.core.NamedThreadLocal;
/**
 * 数据源的Holder
 *
 * @author sunchangtan
 */
public class DataSourceHolder {
	/**
	 * 当前数据组
	 */
	public final static ThreadLocal<DataSourceType> CURRENT_DATASOURCE = new NamedThreadLocal<>("routingdatasource's key");
	static {
		setCurrentDataSource(DataSourceType.WRITE);
	}
	public static void setCurrentDataSource(DataSourceType dataSourceType){
		CURRENT_DATASOURCE.set(dataSourceType);
	}
	public static DataSourceType getCurrentDataSource(){
		return CURRENT_DATASOURCE.get();
	}
	public static void clearDataSource() {
		CURRENT_DATASOURCE.remove();
	}
}

定义数据源代理类,处理读写数据库的路由

import java.io.PrintWriter;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.logging.Logger;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.Constants;
import org.springframework.jdbc.datasource.ConnectionProxy;
/**
 * 数据库代理,具体数据源由DataSourceRouter提供
 * 
 * @author sunchangtan
 */
public class DataSourceProxy implements DataSource {
	private static final Constants constants = new Constants(Connection.class);
	private static final Log logger = LogFactory.getLog(DataSourceProxy.class);
	private Boolean defaultAutoCommit = Boolean.TRUE;
	private Integer defaultTransactionIsolation = 2;
	private DataSourceRouter dataSourceRouter;
	public DataSourceProxy(DataSourceRouter dataSourceRouter) {
		this.dataSourceRouter = dataSourceRouter;
	}
	public void setDefaultAutoCommit(boolean defaultAutoCommit) {
		this.defaultAutoCommit = defaultAutoCommit;
	}
	public void setDefaultTransactionIsolation(int defaultTransactionIsolation) {
		this.defaultTransactionIsolation = defaultTransactionIsolation;
	}
	public void setDefaultTransactionIsolationName(String constantName) {
		setDefaultTransactionIsolation(constants.asNumber(constantName).intValue());
	}
	protected Boolean defaultAutoCommit() {
		return this.defaultAutoCommit;
	}
	protected Integer defaultTransactionIsolation() {
		return this.defaultTransactionIsolation;
	}
	@Override
	public Connection getConnection() throws SQLException {
		return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
				new Class<?>[] { ConnectionProxy.class }, new LazyConnectionInvocationHandler());
	}
	@Override
	public Connection getConnection(String username, String password) throws SQLException {
		return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
				new Class<?>[] { ConnectionProxy.class }, new LazyConnectionInvocationHandler(username, password));
	}
	private class LazyConnectionInvocationHandler implements InvocationHandler {
		private String username;
		private String password;
		private Boolean readOnly = Boolean.FALSE;
		private Integer transactionIsolation;
		private Boolean autoCommit;
		private boolean closed = false;
		public LazyConnectionInvocationHandler() {
			this.autoCommit = defaultAutoCommit();
			this.transactionIsolation = defaultTransactionIsolation();
		}
		public LazyConnectionInvocationHandler(String username, String password) {
			this();
			this.username = username;
			this.password = password;
		}
		@Override
		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
			// Invocation on ConnectionProxy interface coming in...
			if (method.getName().equals("setTransactionIsolation") && args != null && (Integer) args[0] == Connection.TRANSACTION_SERIALIZABLE) {
				 args[0] = defaultTransactionIsolation();
				ConnectionHolder.FORCE_WRITE.set(Boolean.TRUE);
			}
			if (method.getName().equals("equals")) {
				// We must avoid fetching a target Connection for "equals".
				// Only consider equal when proxies are identical.
				return (proxy == args[0]);
			} else if (method.getName().equals("hashCode")) {
				// We must avoid fetching a target Connection for "hashCode",
				// and we must return the same hash code even when the target
				// Connection has been fetched: use hashCode of Connection
				// proxy.
				return System.identityHashCode(proxy);
			} else if (method.getName().equals("unwrap")) {
				if (((Class<?>) args[0]).isInstance(proxy)) {
					return proxy;
				}
			} else if (method.getName().equals("isWrapperFor")) {
				if (((Class<?>) args[0]).isInstance(proxy)) {
					return true;
				}
			} else if (method.getName().equals("getTargetConnection")) {
				// Handle getTargetConnection method: return underlying
				// connection.
				return getTargetConnection(method);
			}
			if (!hasTargetConnection()) {
				// No physical target Connection kept yet ->
				// resolve transaction demarcation methods without fetching
				// a physical JDBC Connection until absolutely necessary.
				if (method.getName().equals("toString")) {
					return "Lazy Connection proxy for target DataSource [" + dataSourceRouter.getTargetDataSource() + "]";
				} else if (method.getName().equals("getMetaData")) {
					return dataSourceRouter.getTargetDataSource().getConnection().getMetaData();
				} else if (method.getName().equals("isReadOnly")) {
					return this.readOnly;
				} else if (method.getName().equals("setReadOnly")) {
					this.readOnly = (Boolean) args[0];
					return null;
				} else if (method.getName().equals("getTransactionIsolation")) {
					if (this.transactionIsolation != null) {
						return this.transactionIsolation;
					}
					// Else fetch actual Connection and check there,
					// because we didn't have a default specified.
				} else if (method.getName().equals("setTransactionIsolation")) {
					this.transactionIsolation = (Integer) args[0];
					return null;
				} else if (method.getName().equals("getAutoCommit")) {
					if (this.autoCommit != null) {
						return this.autoCommit;
					}
					// Else fetch actual Connection and check there,
					// because we didn't have a default specified.
				} else if (method.getName().equals("setAutoCommit")) {
					this.autoCommit = (Boolean) args[0];
					return null;
				} else if (method.getName().equals("commit")) {
					// Ignore: no statements created yet.
					return null;
				} else if (method.getName().equals("rollback")) {
					// Ignore: no statements created yet.
					return null;
				} else if (method.getName().equals("getWarnings")) {
					return null;
				} else if (method.getName().equals("clearWarnings")) {
					return null;
				} else if (method.getName().equals("close")) {
					// Ignore: no target connection yet.
					this.closed = true;
					return null;
				} else if (method.getName().equals("isClosed")) {
					return this.closed;
				} else if (this.closed) {
					// Connection proxy closed, without ever having fetched a
					// physical JDBC Connection: throw corresponding
					// SQLException.
					throw new SQLException("Illegal operation: connection is closed");
				}
			} else {
				if (method.getName().equals("commit")) {
					Map<DataSourceType, Connection> connectionMap = ConnectionHolder.CONNECTION_CONTEXT.get();
					Connection writeCon = connectionMap.get(DataSourceType.WRITE);
					if (writeCon != null) {
						writeCon.commit();
					}
					return null;
				}
				if (method.getName().equals("rollback")) {
					Map<DataSourceType, Connection> connectionMap = ConnectionHolder.CONNECTION_CONTEXT.get();
					Connection writeCon = connectionMap.get(DataSourceType.WRITE);
					if (writeCon != null) {
						writeCon.rollback();
					}
					return null;
				}
				if (method.getName().equals("close")) {
		            ConnectionHolder.FORCE_WRITE.set(Boolean.FALSE);
					Map<DataSourceType, Connection> connectionMap = ConnectionHolder.CONNECTION_CONTEXT.get();
					Connection readCon = connectionMap.remove(DataSourceType.READ);
					if (readCon != null) {
					    readCon.close();
                    }
					Connection writeCon = connectionMap.remove(DataSourceType.WRITE);
					if (writeCon != null) {
						writeCon.close();
					}
					this.closed = true;
					return null;
				}
			}
			// Target Connection already fetched,
			// or target Connection necessary for current operation ->
			// invoke method on target connection.
			try {
			    return method.invoke(
	                     ConnectionHolder.CONNECTION_CONTEXT.get().get(ConnectionHolder.CURRENT_CONNECTION.get()), args);
			} catch (InvocationTargetException ex) {
				throw ex.getTargetException();
			}
		}
		/**
		 * Return whether the proxy currently holds a target Connection.
		 */
		private boolean hasTargetConnection() {
			return (ConnectionHolder.CONNECTION_CONTEXT.get() != null
					&& ConnectionHolder.CONNECTION_CONTEXT.get().get(ConnectionHolder.CURRENT_CONNECTION.get()) != null);
		}
		/**
		 * Return the target Connection, fetching it and initializing it if
		 * necessary.
		 */
		private Connection getTargetConnection(Method operation) throws SQLException {
			// No target Connection held -> fetch one.
			if (logger.isDebugEnabled()) {
				logger.debug("Connecting to database for operation '" + operation.getName() + "'");
			}
			// Fetch physical Connection from DataSource.
			Connection target = (this.username != null)
					? dataSourceRouter.getTargetDataSource().getConnection(this.username, this.password)
					: dataSourceRouter.getTargetDataSource().getConnection();
			// Apply kept transaction settings, if any.
			if (this.readOnly) {
				try {
					target.setReadOnly(this.readOnly);
				} catch (Exception ex) {
					// "read-only not supported" -> ignore, it's just a hint
					// anyway
					logger.debug("Could not set JDBC Connection read-only", ex);
				}
			}
			if (this.transactionIsolation != null && !this.transactionIsolation.equals(defaultTransactionIsolation())) {
				target.setTransactionIsolation(this.transactionIsolation);
			}
			if (DataSourceType.READ == ConnectionHolder.CURRENT_CONNECTION.get()) {
				try {
					target.setAutoCommit(true);
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
			if (this.autoCommit != null && this.autoCommit != target.getAutoCommit()) {
				if (DataSourceType.WRITE == ConnectionHolder.CURRENT_CONNECTION.get()) {
					target.setAutoCommit(this.autoCommit);
				}
			}
			return target;
		}
	}
	@Override
	public PrintWriter getLogWriter() throws SQLException {
		return dataSourceRouter.getTargetDataSource().getLogWriter();
	}
	@Override
	public void setLogWriter(PrintWriter out) throws SQLException {
		dataSourceRouter.getTargetDataSource().setLogWriter(out);
	}
	@Override
	public int getLoginTimeout() throws SQLException {
		return dataSourceRouter.getTargetDataSource().getLoginTimeout();
	}
	@Override
	public void setLoginTimeout(int seconds) throws SQLException {
		dataSourceRouter.getTargetDataSource().setLoginTimeout(seconds);
	}
	// ---------------------------------------------------------------------
	// Implementation of JDBC 4.0's Wrapper interface
	// ---------------------------------------------------------------------
	@Override
	@SuppressWarnings("unchecked")
	public <T> T unwrap(Class<T> iface) throws SQLException {
		if (iface.isInstance(this)) {
			return (T) this;
		}
		return dataSourceRouter.getTargetDataSource().unwrap(iface);
	}
	@Override
	public boolean isWrapperFor(Class<?> iface) throws SQLException {
		return (iface.isInstance(this) || dataSourceRouter.getTargetDataSource().isWrapperFor(iface));
	}
	// ---------------------------------------------------------------------
	// Implementation of JDBC 4.1's getParentLogger method
	// ---------------------------------------------------------------------
	@Override
	public Logger getParentLogger() {
		return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
	}
}

定义路由接口

/**
 * 数据库路由
 *
 */
public interface DataSourceRouter {
	/**
	 * 根据自己的需要,实现数据库路由,可以是读写分离的数据源,或者是分表后的数据源
	 * @return
	 */
	public DataSource getTargetDataSource();
}

实现读库路由的基类

import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.lookup.DataSourceLookup;
import org.springframework.jdbc.datasource.lookup.JndiDataSourceLookup;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
/**
 *  读写数据源路由的基类
 *
 * @author sunchangtan
 */
public abstract class AbstractMasterSlaverDataSourceRouter implements DataSourceRouter, InitializingBean {
	// 配置文件中配置的read-only datasoure
	// 可以为真实的datasource,也可以jndi的那种
	private List<Object> readDataSources;
	private Object writeDataSource;
	private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
	private List<DataSource> resolvedReadDataSources;
	private DataSource resolvedWriteDataSource;
	// read-only data source的数量,做负载均衡的时候需要
	private int readDsSize;
	public List<DataSource> getResolvedReadDataSources() {
		return resolvedReadDataSources;
	}
	public int getReadDsSize() {
		return readDsSize;
	}
	public void setReadDataSoures(List readDataSoures) {
		this.readDataSources = readDataSoures;
	}
	public void setWriteDataSource(Object writeDataSource) {
		this.writeDataSource = writeDataSource;
	}
	public void setDataSourceLookup(DataSourceLookup dataSourceLookup) {
		this.dataSourceLookup = (dataSourceLookup != null ? dataSourceLookup : new JndiDataSourceLookup());
	}
	@Override
	public void afterPropertiesSet() {
		if (writeDataSource == null) {
			throw new IllegalArgumentException("Property 'writeDataSource' is required");
		}
		this.resolvedWriteDataSource = resolveSpecifiedDataSource(writeDataSource);
		if (this.readDataSources == null || this.readDataSources.size() ==0) {
			throw new IllegalArgumentException("Property 'resolvedReadDataSources' is required");
		}
		resolvedReadDataSources = new ArrayList<DataSource>(readDataSources.size());
		for (Object item : readDataSources) {
			resolvedReadDataSources.add(resolveSpecifiedDataSource(item));
		}
		readDsSize = readDataSources.size();
	}
	protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {
		if (dataSource instanceof DataSource) {
			return (DataSource) dataSource;
		}
		else if (dataSource instanceof String) {
			return this.dataSourceLookup.getDataSource((String) dataSource);
		}
		else {
			throw new IllegalArgumentException(
					"Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);
		}
	}
	@Override
	public DataSource getTargetDataSource() {
		if (DataSourceType.WRITE.equals(ConnectionHolder.CURRENT_CONNECTION.get())) {
			return resolvedWriteDataSource;
		} else {
			return loadBalance();
		}
	}
	protected abstract DataSource loadBalance();
}

实现简单的轮询路由,其他路由方式,大家可以自行实现

import javax.sql.DataSource;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 
 * 简单实现读数据源负载均衡
 *
 */
public class RoundRobinMasterSlaverDataSourceRouter extends AbstractMasterSlaverDataSourceRouter {
	private AtomicInteger count = new AtomicInteger(0);
	@Override
	protected DataSource loadBalance() {
		int index = Math.abs(count.incrementAndGet()) % getReadDsSize();
		return getResolvedReadDataSources().get(index);
	}
}

处理“只读事务到读库,读写事务到写库”的事务

import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import javax.sql.DataSource;
/**
 *  事务管理
 *  处理“只读事务到读库,读写事务到写库”
 *  
 * @author sunchangtan 
 */
public class MasterSlaverDataSourceTransactionManager extends DataSourceTransactionManager {
    public MasterSlaverDataSourceTransactionManager(DataSource dataSource) {
        super(dataSource);
    }
    /**
     * 只读事务到读库,读写事务到写库
     * @param transaction
     * @param definition
     */
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        //设置数据源
        boolean readOnly = definition.isReadOnly();
        if(readOnly) {
            DataSourceHolder.setCurrentDataSource(DataSourceType.READ);
        } else {
            DataSourceHolder.setCurrentDataSource(DataSourceType.WRITE);
        }
        super.doBegin(transaction, definition);
    }
    /**
     * 清理本地线程的数据源
     * @param transaction
     */
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        super.doCleanupAfterCompletion(transaction);
        DataSourceHolder.clearDataSource();
    }
}

mybatis的读写分离的插件,需要配置到mybatis-config.xml

import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.executor.statement.RoutingStatementHandler;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.logging.jdbc.ConnectionLogger;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.factory.DefaultObjectFactory;
import org.apache.ibatis.reflection.wrapper.DefaultObjectWrapperFactory;
import org.springframework.jdbc.datasource.ConnectionProxy;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.Properties;
/**
 * 数据源读写分离路由
 */
@Slf4j
@Intercepts({@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class})})
public class MasterSlaveInterceptor implements Interceptor {
    public Object intercept(Invocation invocation) throws Throwable {
        Connection conn = (Connection) invocation.getArgs()[0];
        conn = unwrapConnection(conn);
        if (conn instanceof ConnectionProxy) {
            //强制走写库
            if (ConnectionHolder.FORCE_WRITE.get() != null && ConnectionHolder.FORCE_WRITE.get()) {
                if (log.isDebugEnabled()) {
                    log.debug("本事务强制走写库");
                }
                routeConnection(DataSourceType.WRITE, conn);
                return invocation.proceed();
            }
            StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
            MetaObject metaObject = MetaObject.forObject(statementHandler, new DefaultObjectFactory(), new DefaultObjectWrapperFactory(), new DefaultReflectorFactory());
            MappedStatement mappedStatement;
            if (statementHandler instanceof RoutingStatementHandler) {
                mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
            } else {
                mappedStatement = (MappedStatement) metaObject.getValue("mappedStatement");
            }
            if(mappedStatement.getId().endsWith(".insertSelective!selectKey")) {
                System.out.println("111");
            }
            DataSourceType key = DataSourceHolder.getCurrentDataSource();
            if (key == null) {
                key = DataSourceType.WRITE;
                String sel = statementHandler.getBoundSql().getSql().trim().substring(0, 3);
                if (sel.equalsIgnoreCase("sel")
                        && !mappedStatement.getId().endsWith(".insert!selectKey")
                        && !mappedStatement.getId().endsWith(".insertSelective!selectKey")) {
                    key = DataSourceType.READ;
                }
            }
            if(key == DataSourceType.WRITE) {
                if (log.isDebugEnabled()) {
                    log.debug("当前数据库为写库");
                }
            } else if(key == DataSourceType.READ) {
                if (log.isDebugEnabled()) {
                    log.debug("当前数据库为读库");
                }
            }
            routeConnection(key, conn);
        }
        return invocation.proceed();
    }
    private void routeConnection(DataSourceType key, Connection conn) {
        ConnectionHolder.CURRENT_CONNECTION.set(key);
        // 同一个线程下保证最多只有一个写数据链接和读数据链接
        if (!ConnectionHolder.CONNECTION_CONTEXT.get().containsKey(key)) {
            ConnectionProxy conToUse = (ConnectionProxy) conn;
            conn = conToUse.getTargetConnection();
            ConnectionHolder.CONNECTION_CONTEXT.get().put(key, conn);
        }
    }
    public Object plugin(Object target) {
        if (target instanceof StatementHandler) {
            return Plugin.wrap(target, this);
        } else {
            return target;
        }
    }
    public void setProperties(Properties properties) {
        // NOOP
    }
    /**
     * MyBatis wraps the JDBC Connection with a logging proxy but Spring registers the original connection so it should
     * be unwrapped before calling {@code DataSourceUtils.isConnectionTransactional(Connection, DataSource)}
     *
     * @param connection May be a {@code ConnectionLogger} proxy
     * @return the original JDBC {@code Connection}
     */
    private Connection unwrapConnection(Connection connection) {
        if (Proxy.isProxyClass(connection.getClass())) {
            InvocationHandler handler = Proxy.getInvocationHandler(connection);
            if (handler instanceof ConnectionLogger) {
                return ((ConnectionLogger) handler).getConnection();
            }
        }
        return connection;
    }
}

定义springboot的主从数据库配置

import com.sample.dao.dynamic.DataSourceProxy;
import com.sample.dao.dynamic.DataSourceRouter;
import com.sample.dao.dynamic.RoundRobinMasterSlaverDataSourceRouter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Collections;
/**
 * 主从数据源的配置
 * @author sunchangtan
 */
@EnableConfigurationProperties({DataSourceMasterConfig.class, DataSourceSlaveConfig.class})
@Configuration
public class MasterSlaverDataSourceConfig {
    @Resource
    private DataSourceSlaveConfig dataSourceSlaveConfig;
    @Resource
    private DataSourceMasterConfig dataSourceMasterConfig;
    @Bean
    @ConditionalOnMissingBean
    public DataSourceRouter readRoutingDataSource() {
        RoundRobinMasterSlaverDataSourceRouter proxy = new RoundRobinMasterSlaverDataSourceRouter();
        proxy.setReadDataSoures(Collections.singletonList(dataSourceSlaveConfig.createDataSource()));
        proxy.setWriteDataSource(dataSourceMasterConfig.createDataSource());
        return proxy;
    }
    @Bean
    public DataSource dataSource(DataSourceRouter dataSourceRouter) {
        return new DataSourceProxy(dataSourceRouter);
    }
}

springboot中配置数据库事务

import com.sample.dao.dynamic.MasterSlaverDataSourceTransactionManager;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.aop.Advisor;
import org.springframework.aop.aspectj.AspectJExpressionPointcut;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.interceptor.*;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
 * 事务管理的配置
 *
 * @author sunchangtan
 * @date 2018/8/30 11:22
 */
@Aspect
@Configuration
public class TransactionManagerConfigurer {
    private static final int TX_METHOD_TIMEOUT = 50000;
    private static final String AOP_POINTCUT_EXPRESSION = "execution(* com.sample.***.service..*.*(..))";
    @Resource
    private DataSource dataSource;
    @Bean
    public PlatformTransactionManager transactionManager() {
        return new MasterSlaverDataSourceTransactionManager(dataSource);
    }
    /**
     * 事务的实现Advice
     *
     * @return
     */
    @Bean
    public TransactionInterceptor txAdvice(PlatformTransactionManager transactionManager) {
        NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource();
        RuleBasedTransactionAttribute readOnlyTx = new RuleBasedTransactionAttribute();
        readOnlyTx.setReadOnly(true);
        //使用PROPAGATION_SUPPORTS:支持当前事务,如果当前没有事务,就以非事务方式执行。 如果查询中出现异常,那么当前事务也可以回滚
        readOnlyTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
        RuleBasedTransactionAttribute requiredTx = new RuleBasedTransactionAttribute();
        requiredTx.setRollbackRules(Collections.singletonList(new RollbackRuleAttribute(Exception.class)));
        //使用PROPAGATION_REQUIRED:如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。 如果需要数据库增删改,必须要使用事务
        requiredTx.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        requiredTx.setTimeout(TX_METHOD_TIMEOUT);
        Map<String, TransactionAttribute> txMap = new HashMap<>();
        txMap.put("add*", requiredTx);
        txMap.put("save*", requiredTx);
        txMap.put("insert*", requiredTx);
        txMap.put("update*", requiredTx);
        txMap.put("delete*", requiredTx);
        txMap.put("remove*", requiredTx);
        txMap.put("upload*", requiredTx);
        txMap.put("generate*", requiredTx);
        txMap.put("import*", requiredTx);
        txMap.put("bind*", requiredTx);
        txMap.put("unbind*", requiredTx);
        txMap.put("cancel*", requiredTx);
        txMap.put("send*", requiredTx);
        txMap.put("create*", requiredTx);
        txMap.put("compute*", requiredTx);
        txMap.put("recompute*", requiredTx);
        txMap.put("execute*", requiredTx);
        //txMap.put("submit*", requiredTx);
        txMap.put("get*", readOnlyTx);
        txMap.put("query*", readOnlyTx);
        txMap.put("list*", readOnlyTx);
        txMap.put("has*", readOnlyTx);
        txMap.put("exist*", readOnlyTx);
        txMap.put("download*", readOnlyTx);
        txMap.put("export*", readOnlyTx);
        txMap.put("search*", readOnlyTx);
        txMap.put("check*", readOnlyTx);
        txMap.put("load*", readOnlyTx);
        txMap.put("find*", readOnlyTx);
        source.setNameMap(txMap);
        return new TransactionInterceptor(transactionManager, source);
    }
    /**
     * 切面的定义,pointcut及advice
     *
     * @param txAdvice
     * @return
     */
    @Bean
    public Advisor txAdviceAdvisor(@Qualifier("txAdvice") TransactionInterceptor txAdvice) {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression(AOP_POINTCUT_EXPRESSION);
        return new DefaultPointcutAdvisor(pointcut, txAdvice);
    }
}

到此这篇关于Java基于SpringBoot和tk.mybatis实现事务读写分离代码实例的文章就介绍到这了,更多相关SpringBoot事务读写分离实例内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文