Seata 微服务框架支持

优质
小牛编辑
139浏览
2023-12-01

Seata 的事务上下文由 RootContext 来管理。

应用开启一个全局事务后,RootContext 会自动绑定该事务的 XID,事务结束(提交或回滚完成),RootContext 会自动解绑 XID。

// 绑定 XID
RootContext.bind(xid);

// 解绑 XID
String xid = RootContext.unbind();

应用可以通过 RootContext 的 API 接口来获取当前运行时的全局事务 XID。

// 获取 XID
String xid = RootContext.getXID();

应用是否运行在一个全局事务的上下文中,就是通过 RootContext 是否绑定 XID 来判定的。

    public static boolean inGlobalTransaction() {
        return CONTEXT_HOLDER.get(KEY_XID) != null;
    }

事务传播

Seata 全局事务的传播机制就是指事务上下文的传播,根本上,就是 XID 的应用运行时的传播方式。

1. 服务内部的事务传播

默认的,RootContext 的实现是基于 ThreadLocal 的,即 XID 绑定在当前线程上下文中。

public class ThreadLocalContextCore implements ContextCore {

    private ThreadLocal<Map<String, String>> threadLocal = new ThreadLocal<Map<String, String>>() {
        @Override
        protected Map<String, String> initialValue() {
            return new HashMap<String, String>();
        }

    };

    @Override
    public String put(String key, String value) {
        return threadLocal.get().put(key, value);
    }

    @Override
    public String get(String key) {
        return threadLocal.get().get(key);
    }

    @Override
    public String remove(String key) {
        return threadLocal.get().remove(key);
    }
}

所以服务内部的 XID 传播通常是天然的通过同一个线程的调用链路串连起来的。默认不做任何处理,事务的上下文就是传播下去的。

如果希望挂起事务上下文,则需要通过 RootContext 提供的 API 来实现:

// 挂起(暂停)
String xid = RootContext.unbind();

// TODO: 运行在全局事务外的业务逻辑

// 恢复全局事务上下文
RootContext.bind(xid);

2. 跨服务调用的事务传播

通过上述基本原理,我们可以很容易理解:

跨服务调用场景下的事务传播,本质上就是要把 XID 通过服务调用传递到服务提供方,并绑定到 RootContext 中去。

只要能做到这点,理论上 Seata 可以支持任意的微服务框架。

对 Dubbo 支持的解读

下面,我们通过内置的对 Dubbo RPC 支持机制的解读,来说明 Seata 在实现对一个特定微服务框架支持的机制。

对 Dubbo 的支持,我们利用了 Dubbo 框架的 org.apache.dubbo.rpc.Filter 机制。

/**
 * The type Transaction propagation filter.
 */
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID(); // 获取当前事务 XID
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); // 获取 RPC 调用传递过来的 XID
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) { // Consumer:把 XID 置入 RPC 的 attachment 中
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            if (rpcXid != null) { // Provider:把 RPC 调用传递来的 XID 绑定到当前运行时
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation); // 业务方法的调用

        } finally {
            if (bind) { // Provider:调用完成后,对 XID 的清理
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) { // 调用过程有新的事务上下文开启,则不能清除
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}