mysql分布式事务01

分布式事务组成

  1. 资源管理器: 提供访问事务资源的方法,通常一个数据库就是一个资源管理器
  2. 事务管理器: 协调参与全局事务的各个事务。需要和参与全局事务中的所有资源管理器进行通信。
  3. 应用程序: 定义事务的边界,指定全局事务中的操作。

提交方式

两段式提交。在第一个阶段,所有参与全局事务的节点都开始准备,告诉事务管理器他们准备提交了;第二阶段,事务管理器告诉资源管理器执行commit或者时rollback,如果任意一个节点显示不能提交,则所有的节点进行回滚。

代码示例:

  1. MyXid.java
public class MyXid implements Xid{
	
	public int formatId;
	public byte gtrid[];
	public byte bqual[];

	public MyXid() {
		
	}
	
	public MyXid(int formatId, byte gtrid[], byte bqual[]) {
		this.formatId = formatId;
		this.gtrid = gtrid;
		this.bqual = bqual;
	}
	
	@Override
	public byte[] getBranchQualifier() {
		return bqual;
	}

	@Override
	public int getFormatId() {
		return formatId;
	}

	@Override
	public byte[] getGlobalTransactionId() {
		return gtrid;
	}

}
  1. Demo1.java
public class Demo1 {
	
	/**
	 * 分布式数据库
	 * @param url
	 * @param user
	 * @param pass
	 * @return
	 */
	public static MysqlXADataSource getDataSource(String url, String user, String pass) {
		MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
		mysqlXADataSource.setUrl(url);
		mysqlXADataSource.setUser(user);
		mysqlXADataSource.setPassword(pass);
		return mysqlXADataSource;
	}
	
	public static void main(String[] args) throws SQLException {
		String connStr1 = "jdbc:mysql://192.168.200.137:3306/bank_shanghai";	//数据库1
		String connStr2 = "jdbc:mysql://127.0.0.1:3306/bank_beijing";			//数据库2
		
		MysqlXADataSource ds1 = getDataSource(connStr1,"root","123456");
		MysqlXADataSource ds2 = getDataSource(connStr2, "root", "Gepoint");
		
		XAConnection xaConnection1 = ds1.getXAConnection();
		XAConnection xaConnection2 = ds2.getXAConnection();
		
		XAResource xaResource1 = xaConnection1.getXAResource();
		XAResource xaResource2 = xaConnection2.getXAResource();
		
		Connection connection1 = xaConnection1.getConnection();	//获得连接
		Connection connection2 = xaConnection2.getConnection();	//获得连接
		
		Statement statement1 = connection1.createStatement();
		Statement statement2 = connection2.createStatement();
		
		Xid xid1 = new MyXid(100, new byte[] {0x01},new byte[] {0x02});
		Xid xid2 = new MyXid(100, new byte[] {0x11}, new byte[] {0x12});
		
		try {
			xaResource1.start(xid1, XAResource.TMNOFLAGS);
			statement1.execute("UPDATE money SET avail='22292' WHERE id= 12");
			xaResource1.end(xid1, XAResource.TMSUCCESS);
			xaResource2.start(xid2, XAResource.TMNOFLAGS);
			statement2.execute("UPDATE mone SET avail='22293' WHERE id= 12");//此处错误,会导致整个全局事务回滚
			xaResource2.end(xid2, XAResource.TMSUCCESS);
			int ret2 = xaResource2.prepare(xid2);			//第一阶段
			int ret1 = xaResource1.prepare(xid1);
			if(ret1==XAResource.XA_OK && ret2==XAResource.XA_OK) {
				xaResource1.commit(xid1, false);			//第二阶段
				xaResource2.commit(xid2, false);
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		
	}
	
}