成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

深度剖析 Seata 源碼

開發
本文將針對 seata 分布式事務注冊到提交回滾的全流程進行深入分析和講解,希望對你有幫助。

一、如何使用源碼

需要了解的是,這篇文章是基于筆者相對早期的項目作為樣例進行講解,所以對應的seata版本為1.4.2(核心部分實現大體是一樣的),建議讀者閱讀本文在調試源碼時可以選擇和筆者相同的版本進行理解學習,對應的下載地址為:https://github.com/apache/incubator-seata/tree/v1.4.2

完成下載后,為保證編譯可以通過我們還需要將seata-serializer-protobuf模塊移除掉,該模塊的位置如下圖所示:

同時seata的啟動類位于seata-server模塊,所以我們需要將該模塊的registry.conf的配置改為自己的配置:

以筆者為例,seata配置都是通過nacos進行統一管理的,所以對應的配置類型也都是針對nacos維度去協調適配,大體配置如下所示:

registry {
  # 將seata注冊到nacos上
  type = "nacos"
  nacos {
  # nacos地址
    serverAddr = "ip:8848"
    # 命名空間id
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    # 組名
    group = "DEFAULT_GROUP"
    # 集群節點名稱
    cluster = "default"
  }
}
config {
  # 通過nacos獲取配置
  type = "nacos"
  nacos {
    serverAddr = "ip:8848"
    namespace = "7c1cfd88-15e4-437d-8e82-2d22d034f447"
    group = "DEFAULT_GROUP"
  }
}

經過這幾個步驟后seata就可以像我們日常一樣的方式進行使用了。

二、基于AT模式詳解Seata全鏈路流程

1. seata服務端啟動

我們先從seata的服務端啟動開始,seata服務端啟動時會進行如下幾個核心步驟:

  • 創建工作線程池workingThreads。
  • 基于工作線程池創建一個Netty服務端對外提供服務。
  • 基于該服務端創建的一個默認的協調者DefaultCoordinator管理全局事務。
  • 默認協調者初始化幾個定時任務處理一些異步的全局事務提交、回滾、超時監測的任務。

對應的我們給出這塊邏輯的核心入口代碼,即位于Server的主函數入口的main方法,可以看到seata服務端的創建是基于netty完成的,完成創建和初始化之后就與協調者coordinator進行綁定:

public static void main(String[] args) throws IOException {
        //......
        //創建工作線程池處理業務請求
        ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
                NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
                new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
        //基于該線程池初始化 seata 服務端
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
 //......
        //log store mode : file, db, redis
        SessionHolder.init(parameterParser.getStoreMode());
        //初始化協調者,處理seata服務端收到的各種事務讀寫請求
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        //初始化各種異步定時任務:全局事務提交、全局事務回滾、超時監測等
        coordinator.init();
        //將協調者作為seata服務端的處理器
        nettyRemotingServer.setHandler(coordinator);
       //......
    }

對應的我們也給出默認協調者的初始化源碼,即DefaultCoordinator的init方法,可以看到這段代碼本質上就是提交一些定時任務處理全局事務提交、回滾、超時監測、undo log刪除等:

public void init() {
        //每秒執行,處理需要回滾的分布式事務
        retryRollbacking.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.retryRollbackingLock();
            if (lock) {
                try {
                    handleRetryRollbacking();
                } catch (Exception e) {
                    LOGGER.info("Exception retry rollbacking ... ", e);
                } finally {
                    SessionHolder.unRetryRollbackingLock();
                }
            }
        }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
        //異步定時提交全局事務的定時任務,每秒執行一次
        asyncCommitting.scheduleAtFixedRate(() -> {
            boolean lock = SessionHolder.asyncCommittingLock();
            if (lock) {
                try {
                    //掃描獲取各種異步提交的全局事務
                    handleAsyncCommitting();
                } catch (Exception e) {
                    LOGGER.info("Exception async committing ... ", e);
                } finally {
                    SessionHolder.unAsyncCommittingLock();
                }
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

       //......
    }

2. 本地服務如何基于GlobalTransaction注解開啟事務

我們都知道seata也是基于spring boot實現的,所以我們可以大膽的認為應用端使用GlobalTransaction開啟分布式事務本質上也是和spring boot自動裝配有著一定的聯系。

所以我們從seata-spring-boot-starter這個腳手架的源碼包的spring.factories文件入手,可以看到一個SeataAutoConfiguration的注入:

于是我們就可以看到一個GlobalTransactionScanner即一個關于GlobalTransaction注解掃描的類:

@Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
      //......
        //掃描我們的配置文件中配置的applicationId、txServiceGroup對應的事務
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

查看GlobalTransactionScanner源碼我們可以看到該類型繼承了spring的初始化bean并設置屬性后的拓展點InitializingBean的afterPropertiesSet方法,該方法內部會初始化當前seata客戶端,分別初始化TM客戶端(使用GlobalTransaction注解的方法的服務即做為TM)和RM客戶端處理其他TM或者RM服務端發送的消息,它們初始化的工作分別是:

  • TM客戶端會注冊各種TC消息響應的處理器,處理各種seata server對應的TC響應的消息,例如:全局事務開啟結果處理器、全局事務提交處理器、全局事務回滾處理器等。
  • RM客戶端則是注冊一些各種seata server對應TC請求消息的處理器,例如:分支事務提交、分支事務回滾、分支事務undo.log刪除等。

對應我們給出GlobalTransactionScanner的afterPropertiesSet源碼可以看到客戶端初始化這段調用的入口,可以看到啟動時某個線程完成CAS上鎖初始化標識之后,即通過initClient初始化客戶端:

@Override
    public void afterPropertiesSet() {
        //......
        //基于擴展點進行客戶端初始化
        if (initialized.compareAndSet(false, true)) {
            initClient();
        }
    }

步入后即可看到對于TM和RM客戶端的初始化調用:

private void initClient() {
        //......
        // 初始化TM客戶端
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
       //......
        // 初始化RM客戶端
        RMClient.init(applicationId, txServiceGroup);
      //......
    }

此時我們先看看TM客戶端內部的處理函數即位于TmNettyRemotingClient的registerProcessor即可看到上述所說的TC響應消息處理器的綁定步驟,即:

  • 注冊TC響應消息處理器
  • 注冊全局事務開啟響應處理器
  • 注冊全局事務提交響應處理器
  • 注冊心跳消息處理器
private void registerProcessor() {
        // 1.registry TC response processor 注冊一些TC響應消息的處理器
        ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        //全局事務開啟結果響應處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        //全局事務提交響應處理器
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        // 2. 注冊心跳消息
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同理我們也給出RM客戶端內部初始化的調用RmNettyRemotingClient的registerProcessor方法:

  • 注冊分支事務提交消息處理器
  • 注冊rm客戶端對應的分支事務提及和回滾處理器
  • 注冊undo Log刪除處理器
  • 注冊TC響應消息處理器
  • 注冊心跳處理器
private void registerProcessor() {
        // 1. 注冊分支事務提交消息處理器
        RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
        // 2.注冊rm客戶端對應的分支事務回滾處理器
        RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
        // 3. 注冊undo log刪除處理器
        RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
        // 4. 注冊TC響應消息處理器
        ClientOnResponseProcessor onResponseProcessor =
            new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
        // 5.注冊心跳消息處理器
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

同時GlobalTransactionScanner繼承了AbstractAutoProxyCreator的wrapIfNecessary,該代理類會在spring容器中的bean進行檢查并決定是否進行動態代理。以我們的GlobalTransactionScanner邏輯它本質上就是:

  • 檢查當前bean是否有GlobalTransactional這個注解
  • 如果有則基于全局事務攔截器對其進行增強

對應核心邏輯如下所示,可以看到這段代碼會通過existsAnnotation檢查當前bean是否存在GlobalTransactional注解,如果有則基于globalTransactionalInterceptor 對其進行增強:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
              //......
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                   //......
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    //判斷是否有GlobalTransaction注解,如果有則為其生成分布式事務的動態代理
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    //如果攔截器為空則初始化攔截器
                    if (globalTransactionalInterceptor == null) {
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

              //......
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    //基于上一步的interceptor為其生成動態代理
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
             //......
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

這也就意味著我們調用帶有GlobalTransactional注解方法時,就會走到GlobalTransactionalInterceptor的增強邏輯上,它會走到GlobalTransactionalInterceptor的invoke方法上,最終會走到事務模板類transactionalTemplate的execute方法,該方法會執行如下三個核心步驟:

  • 開啟全局事務。
  • 執行原始業務邏輯。
  • 根據各個分支事務結果提交或者回滾事務。

對應的我們給出GlobalTransactionalInterceptor的invoke方法,可以看到當該方法認為注解存在的情況下會直接調用handleGlobalTransaction開啟并處理全局事務:

@Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
      //......
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {

            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //獲取GlobalTransactional注解信息
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            //......
            if (!localDisable) {
                //若全局事務注解不為空則調用handleGlobalTransaction處理全局事務
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                     //......
                }
            }
        }
         //......
    }

步入其內部就會走到transactionalTemplate的execute方法,即可看到對于:

  • 分支事務的創建
  • 告知TC請求開啟全局事務
  • 執行本地事務
  • 全局提交或者回滾

對應邏輯的源碼如下所示,讀者可結合說明了解:

public Object execute(TransactionalExecutor business) throws Throwable {
       //......

            // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
            //如果tx為空則以全局事務啟動者的身份創建一個全新的事務
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

            // set current tx config to holder
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                //向TC發送請求開啟全局事務
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    // Do Your Business
                    //執行業務邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {
                    // 3. The needed business exception to rollback.
                    //全局事務回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                //分支事務執行成功,提交全局事務
                commitTransaction(tx);

                return rs;
            } finally {
             //......
            }
        } finally {
         //......
        }
    }

3. 客戶端如何開啟分布式事務

上文調用分布式事務的方法時內部會走到的代理的transactionalTemplate的execute方法,其內部有個beginTransaction就是開啟分布式事務的關鍵,由上文可知作為GlobalTransactional注解的方法對對應的服務就是作為TM即transaction manager,所以在調用beginTransaction時,這個方法的代理就會以TM的身份發送一個請求告知TC自己要開啟一個全局事務,TC經過自己的協調處理后(后文會介紹流程)返回一份xid告知TM開啟成功:

對應的我們查看seata客戶端對應TransactionalTemplate的beginTransaction方法即可看到begin方法的調用,該方法回告知seata服務端自己要開啟一個全局事務:

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //......
            //開始分布式事務
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
         //......
        } catch (TransactionException txe) {
            //......

        }
    }

查看begin內部就是通過TM發起請求,得到xid并緩存到當前線程內部,開始后續的執行流程分布式事務處理流程:

@Override
    public void begin(int timeout, String name) throws TransactionException {
        //......
        //通過TM告知TC開啟全局事務,從而得到xid
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        //將xid緩存到當前線程的緩存中
        RootContext.bind(xid);
        //......
    }

4. seata服務端如何注冊全局事務

基于上述請求,對應seata server端的TC收到請求后會基于傳參中的消息標信息,定位到對應的執行器即TM消息處理器,然后驅動TM處理器將這個請求生成一份全局session信息從而構成本次請求的全局事務信息,再將請求寫入數據表中:

我們給出TC處理消息的代碼入口AbstractNettyRemotingServer的channelRead方法,從名字不難看出TC服務端也是基于netty實現,其內部通過processMessage處理各種消息:

@Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  
            //基于netty編寫的服務端,channelRead通過processMessage處理客戶端各種請求
            processMessage(ctx, (RpcMessage) msg);
        }

步入processMessage即可看到基于處理表定位消息并交由處理器處理消息邏輯pair.getFirst().process(ctx, rpcMessage);:

protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
         //......
         //獲取網絡消息
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            //通過處理表定位到對應的處理器
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        pair.getSecond().execute(() -> {
                            try {
                                //基于第一個處理器處理當前消息
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                //......
                            } finally {
                                //......
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        //......
                        
                    }
                } else {
                //......
               }
        }
    }

因為我們的消息是TM發來的,所以上一步的處理器是ServerOnRequestProcessor的:

@Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        if (ChannelManager.isRegistered(ctx.channel())) {
         //處理TM客戶端發送來的消息
            onRequestMessage(ctx, rpcMessage);
        } else {
           //......
        }
    }

最終走到GlobalBeginRequest這個工具的handle基于協調者將事務信息寫入global_table從而得到xid返回給TM客戶端:

@Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
  //生成全局事務信息并得到xid將數據寫入響應返回給TM
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
       //.......
    }

5. RM和TC如何協調處理分支事務

完成全局事務的注冊管理之后,我們再來聊聊各個分支事務的執行和提交回滾,上文提及,seata原生我們本地的jdbc數據庫連接通過代理加以封裝,所以在我們seata客戶端執行本地事務完成后提交的commit方法是經過了seata的代理這一層,該連接代理在調用commit方法時,其內部就會通過RM向TC注冊一個分支事務的請求,TC收到請求后會執行如下工作:

  • 基于lock_table嘗試為事務生成全局鎖。
  • 分支事務信息寫入到branch_table表上并返回branch_id給RM:

我們給出ConnectionProxy的commit方法入口,其內部調用了一個doCommit方法,它就是事務提交的核心邏輯:

@Override
    public void commit() throws SQLException {
        try {
         //excute會調用doCommit生成undoLog緩存和執行分支事務
            LOCK_RETRY_POLICY.execute(() -> {
                //excuete執行成功后這一步會注冊分支事務并提交本地事務和undoLog鏡像以保證原子性
                doCommit();
                return null;
            });
        } catch (SQLException e) {
           //......
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

其內部調用ConnectionProxy的doCommit會調用processGlobalTransactionCommit執行分支事務:

private void doCommit() throws SQLException {
        //如果處于全局事務中則調用processGlobalTransactionCommit
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
         //......
        } else {
           //......
        }
    }

最終就可以在processGlobalTransactionCommit看到如下邏輯:

  • register這個注冊分支事務的邏輯,TC基于RM給定的resourceId信息,生成操作數據的全局鎖,并插入分支事務信息到brach_table中。
  • undo日志刷盤到本地undo日志中。
  • 本地業務的事務提交。
private void processGlobalTransactionCommit() throws SQLException {
        try {
            //向TC發起請求注冊分支事務,TC基于RM給定的resourceId生成全局鎖并插入分支事務信息到brach_table后就不會拋異常
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //undo日志刷盤
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            //本地事務提交
            targetConnection.commit();
        } catch (Throwable ex) {
          //......
        }
          //......
    }

這里我們著重看一下register函數,其內部本質上就是通過RM客戶端告知TC自己準備執行分支事務提交,幫我上一把全局鎖并注冊分支事務:

private void register() throws TransactionException {
        if (!context.hasUndoLog() || !context.hasLockKey()) {
            return;
        }
        //向tc發起請求并獲得register
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
        //緩存到當前線程中
        context.setBranchId(branchId);
    }

最后這個注冊的邏輯就會來到AbstractResourceManager的branchRegister上,可以看到它會攜帶著全局事務id和主鍵等數據發送請求給TC:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            //傳入全局事務id即xid
            request.setXid(xid);
            //基于當前數據主鍵生成lockkeys
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
            request.setBranchType(branchType);
            request.setApplicationData(applicationData);
            //基于RM的netty客戶端將其異步發送
            BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
         //......
            return response.getBranchId();
        } catch (TimeoutException toe) {
           //......
        }
    }

6. seata服務端處理分支事務請求

TC處理流程與上述文章同理,收到消息后基于request中的消息表定位到對應的處理器,我們這里最終會走到BranchRegisterRequest的處理器上,通過AbstractTCInboundHandler注冊分支事務:

@Override
    public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
        BranchRegisterResponse response = new BranchRegisterResponse();
        exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
            @Override
            public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
                throws TransactionException {
                try {
                    //tc注冊分支事務入口
                    doBranchRegister(request, response, rpcContext);
                } catch (StoreException e) {
                 //......
                }
            }
        }, request, response);
        return response;
    }

最終這段邏輯就會走到AbstractCore的branchRegister,大體執行的步驟是:

  • 生成分支事務session
  • 嘗試獲得數據全局鎖lock_table
  • 取鎖成功將分支事務信息寫入branch_table
  • 返回branch_id給RM

對應源碼邏輯如下,大體邏輯就說基于分支事務session生成全局鎖存到lock_table后,將分支事務信息存到branch_table中:

@Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
      //......
        return SessionHolder.lockAndExecute(globalSession, () -> {
             //......
            //獲取分支事務的表信息并將其寫入到lock_table中意味獲得全局鎖,上鎖失敗會拋異常
            branchSessionLock(globalSession, branchSession);

            try {
                //添加分支事務信息到branch_table中
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                 //......
            }
             //......
             //返回分支事務id
            return branchSession.getBranchId();
        });
    }

TC返回成功后,RM就會執行undo日志刷盤和本地事務提交,詳情參考我們本節代碼processGlobalTransactionCommit方法,這里不貼出了。

7. RM生成回滾日志

對于java程序而言大部分SQL操作底層都是基于Executor執行器操作的,在上述代理執行commit方法前,seata底層將代理的連接即上文的connectionProxy通過AbstractDMLBaseExecutor執行SQL操作,該執會針對我們的連接代理進行如下邏輯處理:

  • 判斷連接代理connectionProxy是否是自動提交,若非自動提交則調用executeAutoCommitFalse方法,該方法會生成undoLog數據寫入緩存,然后將RM當執行分支事務SQL,基于該執行結果生成后置鏡像,最后將undo日志寫入undo_log表中。
  • 若開啟自動提交則關閉自動提交后,復用executeAutoCommitFalse方法執行系統的undoLog和分支事務SQL的執行操作。

對應源碼的整體工作鏈路圖如下所示:

這里我們直接給出AbstractDMLBaseExecutor的doExecute方法作為入口,可以看到若開啟自動提交則調用executeAutoCommitTrue,反之調用executeAutoCommitFalse:

@Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //若自動提交則關閉自動提交,并生成undo信息存入緩沖區
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            //直接生成undo log鏡像寫入緩存
            return executeAutoCommitFalse(args);
        }
    }

因為都會復用executeAutoCommitFalse這段邏輯,所以我們直接查看這個方法的邏輯,可以看到該邏輯內部會基于分支事務前后的數據生成前置和后置鏡像:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //基于分支事務的SQL定位操作前的SQL生成前置鏡像
        TableRecords beforeImage = beforeImage();
        //執行分支事務的SQL 
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        //生成分支事務操作后置鏡像
        TableRecords afterImage = afterImage(beforeImage);
        //將undoLog寫入緩沖區
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

8. 事務全局提交與回滾

TransactionalTemplate(即TM)驅動各種分支事務準備成功后,就會執行commitTransaction提交全局事務,對應的代碼位于TransactionalTemplate的execute方法,該方法會通知TC驅動全局事務提交,而TC收到該請求之后,就會驅動各個分支事務提交事務,每個分支事務收到該請求后就會刪除undoLog并提交各自未提交的事務:

public Object execute(TransactionalExecutor business) throws Throwable {
          //......

            try {
             
                //向TC發送請求開啟全局事務
                beginTransaction(txInfo, tx);

                Object rs;
                try {
                    
                    //執行業務邏輯(被代理的原始方法)
                    rs = business.execute();
                } catch (Throwable ex) {            
                    //全局事務回滾
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

              
                //分支事務執行成功,提交全局事務
                commitTransaction(tx);

                return rs;
            } finally {
          //......
            }
        } finally {
          //......
        }
    }

步入其內部可以看到DefaultGlobalTransaction調用transactionManager即TM提交全局事務:

@Override
    public void commit() throws TransactionException {
       //......
        try {
            while (retry > 0) {
                try {
                    //執行全局事務提交
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                   //......
            }
        } finally {
          //......
        }
        //......
    }

這個commit的邏輯也很簡單,即告知TC要提交全局事務了:

@Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        //通知TC提交全局事務
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
    }

對應的TC收到該請求后,對應的AbstractTCInboundHandler就會調用doGlobalCommit通知各個RM提交全局事務:

@Override
    public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
        GlobalCommitResponse response = new GlobalCommitResponse();
        response.setGlobalStatus(GlobalStatus.Committing);
        exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
            @Override
            public void execute(GlobalCommitRequest request, GlobalCommitResponse response)
                throws TransactionException {
                try {
                //遍歷RM提交各個分支事務
                    doGlobalCommit(request, response, rpcContext);
                } catch (StoreException e) {
                  //......
                }
            }
     //......

          //......


        }, request, response);
        return response;
    }

對應的我們可以來道該源碼內部的DefaultCore的doGlobalCommit方法印證這一點,可以看到該方法會遍歷各個分支事務調用branchCommit通知其提交或者回滾事務:

@Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
      //......
        if (globalSession.isSaga()) {
            success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
        } else {
            //遍歷全局事務中的分支事務
            Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
                //......
                }
                try {
                    //告知RM提交事務
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    //......
                } catch (Exception ex) {
                    //......
                }
                return CONTINUE;
            });
             //......
        }
        //......
        return success;
    }

最后請求達到RM上的DefaultRMHandler按照TC要求提交或者回滾事務:

//RM提交分支事務
    @Override
    public BranchCommitResponse handle(BranchCommitRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }
    //RM回滾分支事務
    @Override
    public BranchRollbackResponse handle(BranchRollbackRequest request) {
        MDC.put(RootContext.MDC_KEY_XID, request.getXid());
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
        return getRMHandler(request.getBranchType()).handle(request);
    }

提交事務本質上就是提交后刪除undoLog即可,這里我們以分支事務回滾為例,可以看到上述代碼BranchRollbackResponse 會調用handle方法執行分支事務回滾,該方法最終會走到AbstractRMHandler的doBranchRollback,該方法會調動RM管理器將分支事務回滾:

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
        throws TransactionException {
        //......
        //回滾分支事務
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
            applicationData);
        //將xid和處理結果狀態響應給TC
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
       //......
    }

最終該方法內部就會調用AbstractUndoLogManager的undo解析當前分支事務的前置鏡像數據,該方法內部執行邏輯為:

  • 定位分支事務的undo日志數據
  • 反序列化為undo對象
  • 基于該undo對象信息解析出表名、列以及數據等信息。
  • 通過undoExecutor 執行器將分支事務還原。

對應源碼如下:

@Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
      //......

        for (; ; ) {
            try {
               //......

                // Find UNDO LOG
                //獲取當前分支事務的undo鏡像
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();

                boolean exists = false;
                while (rs.next()) {
                   //......
                    //獲取undo數據
                    byte[] rollbackInfo = getRollbackInfo(rs);

                    //反序列化生成undo對象 branchUndoLog
                    String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                        : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                    try {
                        // put serializer name to local
                        setCurrentSerializer(parser.getName());
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        if (sqlUndoLogs.size() > 1) {
                            Collections.reverse(sqlUndoLogs);
                        }
                        //遍歷undo對象生成SQL還原分支事務值
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            //獲取表的表名、列的元信息
                            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                            sqlUndoLog.setTableMeta(tableMeta);
                            //獲取對應的執行執行器 將對應分支事務的表數據回滾
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    } finally {
                        // remove serializer name
                        removeCurrentSerializer();
                    }
                }

                //......
            } catch (SQLIntegrityConstraintViolationException e) {
                //......
            } catch (Throwable e) {
                //......

            } finally {
               //......
            }
        }
    }

三、小結

讓我們來做個小結,總的來說seata實現數據庫的AT模式分布式事務的流程為:

(1) 調用帶有globalTransactional注解的方法執行業務邏輯。

(2) 該方法以TM的身份通知TC開啟全局事務。

(3) TC收到通知后到global_table創建該方法的全局事務信息,這里以筆者某個下單業務的分布式事務場景為例,對應的數據如下所示:

(4) RM開始工作,各自找TC注冊分支事務,基于當前數據生成全局鎖存入lock_table,保證當前數據操作時沒有其他事務干擾:

全局鎖成功后TC將數據存入branch_table表,對應數據如下所示:

(5) RM完成分支事務注冊后,持有本地鎖的事務執行本地分支事務,成功后將生成分支數據的前后鏡像undo表,如下所示:

這里我們以后置鏡像為例子查看賬戶表修改后的字段值為例,可以看到該鏡像將每一個字段的類型、值等信息都序列化為JSON生成undo鏡像:

(6) TM感知到所有分支事務準備成功,通知TC將這些RM(分支事務)提交,即將undoLog刪除,反之基于undoLog將數據回滾。

對應我們給出下面這段圖,讀者可以結合上面源碼梳理一下全流程:

我是 SharkChili ,Java 開發者,Java Guide 開源項目維護者。歡迎關注我的公眾號:寫代碼的SharkChili,也歡迎您了解我的開源項目 mini-redis:https://github.com/shark-ctrl/mini-redis。

責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2022-09-27 18:56:28

ArrayList數組源代碼

2024-02-05 19:06:04

DartVMGC流程

2010-03-01 14:50:06

Python 工具

2009-09-15 14:52:15

linq級聯刪除

2010-03-01 18:33:30

2023-01-10 13:48:50

ContainerdCRI源碼

2011-05-23 14:20:59

WordPress

2010-02-01 13:34:59

Python 腳本

2010-02-02 15:25:35

Python語法

2010-02-03 16:56:24

Python包

2010-03-05 16:38:30

2014-10-17 09:30:38

2020-04-01 10:28:12

Apache HBas數據結構算法

2010-02-04 15:38:39

Android 手機

2022-03-24 14:40:31

開發Harmony鴻蒙

2022-04-29 14:56:40

通話應用源碼剖析

2009-12-07 18:43:29

WCF框架

2010-02-05 18:00:18

Android源代碼

2010-02-06 15:32:30

Android架構

2010-02-26 17:44:40

Python測試框架
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久精品欧美视频 | 九九亚洲| 久久青视频 | 青青草视频网 | 黄网址在线观看 | 中文字幕在线第一页 | 国产日韩欧美激情 | 国产精品一区在线观看 | 久久蜜桃av一区二区天堂 | 91福利在线观看视频 | 亚洲精品三级 | 国产在线1| 中文字幕视频一区 | 国产一区二区三区在线 | 国产精品一级在线观看 | 91文字幕巨乱亚洲香蕉 | 国产精品一区二区三区在线 | 中国91av| 精品亚洲国产成av人片传媒 | 久久合久久 | 午夜欧美a级理论片915影院 | 99久久日韩精品免费热麻豆美女 | 久久国产福利 | 天天干天天玩天天操 | 久久久妇女国产精品影视 | 日韩免费一区二区 | 免费的av| 亚洲毛片在线观看 | 新超碰97 | www国产亚洲精品久久网站 | 午夜免费网站 | 日韩无 | 无毛av| 久久男人 | 日韩一级精品视频在线观看 | 国产精品一区一区 | 国产精品亚洲一区二区三区在线 | 久久久国产一区二区三区 | 一区二区三区国产好 | av天天操 | 国产免费av在线 |