当前位置: 首页 > 工具软件 > TDDL > 使用案例 >

Spring源码分析之TDDL

裴理
2023-12-01
一、目前项目中只是使用了TDDL的主从模块
<bean id="dataSource" class="com.taobao.tddl.group.jdbc.TGroupDataSource" init-method="init" destroy-method="destroyDataSource">
    <property name="appName" value="gome_market_search_index" />
    <property name="dbGroupKey" value="gome_market_search_index_group_0" />
    <property name="dsKeyAndWeightCommaArray" value="db0:rwp1q1i0, db1:rwp0q0i1" />
</bean>
二、多个数据源通过TGroupDataSource统一管理
public class TGroupDataSource implements DataSource {
    private ConfigManager configManager;
    private String dsKeyAndWeightCommaArray; //例如 db0:rwp1q1i0, db1:rwp0q0i1
    private DataSourceFetcher dataSourceFetcher;
    private String appName;
    private String dbGroupKey;
    
    /**
     * 基于dbGroupKey、appName来初始化多个TAtomDataSource
    */
    public void init() {
       if (dsKeyAndWeightCommaArray != null) {
          //本地配置方式:dsKeyAndWeightCommaArray + dataSourceFetcher + dyType
          DataSourceFetcher wrapper = new DataSourceFetcher() {
             @Override
             public DataSource getDataSource(String key) {
                return dataSourceFetcher.getDataSource(key);
             }
    
             @Override
             public DBType getDataSourceDBType(String key) {
                DBType type = dataSourceFetcher.getDataSourceDBType(key);
                return type == null ? dbType : type; //如果dataSourceFetcher没dbType,用tgds的dbType
             }
          };
          //dsKeyAndWeightCommaArray 例如 db0:rwp1q1i0, db1:rwp0q0i1
          List<DataSourceWrapper> dss = ConfigManager.buildDataSourceWrapper(dsKeyAndWeightCommaArray, wrapper);
          ->String[] dsWeightArray = dsWeightCommaStr.split(","); 
          ->for (int i = 0; i < dsWeightArray.length; i++) 
            ->String[] dsAndWeight = dsWeightArray[i].split(":");
            ->DataSourceWrapper dsw = new DataSourceWrapper(dsKey, weightStr,dataSource, fetcherDbType, i);
              ->this.weight = new Weight(weightStr);
                ->r = getUnitWeight(weightStr, 'R', weightPattern_r, 0, 10);
                ->w = getUnitWeight(weightStr, 'W', weightPattern_w, 0, 10);
            ->dss.add(dsw);
          ->return dss;  
          init(dss);
       } else {
          checkProperties();
          configManager = new ConfigManager(this);
          configManager.init();
       }
    }
    
    public void init(List<DataSourceWrapper> dataSourceWrappers) {
       configManager = new ConfigManager(this);
       configManager.init(dataSourceWrappers);
    }
    
    //获取连接,TGroupConnection也只是一个代理
    public TGroupConnection getConnection() throws SQLException {
       return new TGroupConnection(this);
    }
}
三、TGroupStatement执行具体的sql的时候根据读写权重去获取真正的Connection
public class TGroupStatement implements Statement {
    protected TGroupConnection tGroupConnection;
    protected TGroupDataSource tGroupDataSource;
    private Statement baseStatement;
    
    public TGroupStatement(TGroupDataSource tGroupDataSource, TGroupConnection tGroupConnection) {
    		this.tGroupDataSource = tGroupDataSource;
    		this.tGroupConnection = tGroupConnection;
    		this.retryingTimes = tGroupDataSource.getRetryingTimes();
	}
 
   public boolean execute(String sql, String[] columnNames) throws SQLException {
		return executeInternal(sql, -1, null, columnNames);
	}
 
   private boolean executeInternal(String sql, int autoGeneratedKeys, int[] columnIndexes, String[] columnNames) throws SQLException {
		SqlType sqlType = SQLParser.getSqlType(sql);
		if (sqlType == SqlType.SELECT || sqlType == SqlType.SELECT_FOR_UPDATE || sqlType == SqlType.SHOW) {
			executeQuery(sql);
			return true;
		} else if (sqlType == SqlType.INSERT || sqlType == SqlType.UPDATE || sqlType == SqlType.DELETE||sqlType == SqlType.REPLACE||sqlType == SqlType.TRUNCATE
				|| sqlType == SqlType.CREATE|| sqlType== SqlType.DROP|| sqlType == SqlType.LOAD|| sqlType== SqlType.MERGE) {
			if (autoGeneratedKeys == -1 && columnIndexes == null && columnNames == null) {
				executeUpdate(sql);
			} else if (autoGeneratedKeys != -1) {
				executeUpdate(sql, autoGeneratedKeys);
			} else if (columnIndexes != null) {
				executeUpdate(sql, columnIndexes);
			} else if (columnNames != null) {
				executeUpdate(sql, columnNames);
                ->return executeUpdateInternal(sql, -1, null, columnNames);
                  ->Connection conn = tGroupConnection.getBaseConnection(sql,false);
                  ->this.updateCount=executeUpdateOnConnection(conn, sql, autoGeneratedKeys, columnIndexes, columnNames);
			} else {
				executeUpdate(sql);
			}

			return false;
		} else {
			throw new SQLException("only select, insert, update, delete,replace,truncate,create,drop,load,merge sql is supported");
		}
	}
 
   public ResultSet executeQuery(String sql) throws SQLException {
		checkClosed();
		ensureResultSetIsEmpty();

		boolean gotoRead = SqlType.SELECT.equals(SQLParser.getSqlType(sql)) && tGroupConnection.getAutoCommit();
		Connection conn = tGroupConnection.getBaseConnection(sql,gotoRead);

		if (conn != null){
			sql=GroupHintParser.removeTddlGroupHint(sql);
			return executeQueryOnConnection(conn, sql);
		}else{
			Integer dataSourceIndex = GroupHintParser.convertHint2Index(sql);
			sql=GroupHintParser.removeTddlGroupHint(sql);
			if (dataSourceIndex < 0) {
				dataSourceIndex = ThreadLocalDataSourceIndex.getIndex();
			}
			return this.tGroupDataSource.getDBSelector(gotoRead).tryExecute(executeQueryTryer, retryingTimes, sql,dataSourceIndex);
            ->DBSelector getDBSelector(boolean isRead)
              ->return configManager.getDBSelector(isRead, this.autoSelectWriteDataSource);
                ->DBSelector dbSelector = isRead ? readDBSelectorWrapper : writeDBSelectorWrapper;
            ->public <T> T tryExecute(DataSourceTryer<T> tryer, int times, Object... args)
              ->return this.tryExecute(new LinkedHashMap<DataSource, SQLException>(0),tryer, times, args);
                ->Map<String, Integer> tableDsIndexMap = groupExtraConfig.getTableDsIndexMap();
                ->Map<String, Integer> sqlDsIndexMap = groupExtraConfig.getSqlDsIndexMap();
                ->String actualTable = SQLPreParser.findTableName(nomalSql);
                ->dataSourceIndex = tableDsIndexMap.get(actualTable);
                ->DataSourceHolder dsHolder = findDataSourceWrapperByIndex(dataSourceIndex);
                  ->DataSourceHolder holder = priorityGroups[i].findDataSourceWrapperByIndex(dataSourceIndex);
                ->return tryOnDataSourceHolder(dsHolder, failedDataSources, tryer, times, args);
                  ->T t = tryer.tryOnDataSource(dsHolder.dsw, args);
                    ->Connection conn = TGroupStatement.this.tGroupConnection.createNewConnection(dsw, true);
                      ->conn = dsw.getConnection();
                      ->setBaseConnection(conn, dsw, isRead && isAutoCommit);
                    ->return executeQueryOnConnection(conn, sql);
                      ->Statement stmt = createStatementInternal(conn, false);
                        ->stmt = conn.createStatement();
                      ->this.currentResultSet = stmt.executeQuery(sql);
                  ->return t;
		}
	}
}
四、数据源选择规则分析

4.1、通过db0:rwp1q1i0, db1:rwp0q0i1设置每个数据源的读写权重
4.2、db0:rwp1q1i0, db1:rwp0q0i1转换成Weight
4.3、根据Weight创建DBSelector
4.4、PriorityDbGroupSelector继承DBSelector根据优先级选择EquityDbManager
4.5、EquityDbManager也继承自DBSelector根据读写权重,随机选择具体数据源

public class PriorityDbGroupSelector extends AbstractDBSelector {
    private EquityDbManager[] priorityGroups;
    
    public PriorityDbGroupSelector(EquityDbManager[] priorityGroups) {
        this.priorityGroups = priorityGroups;
    }
    
    public DataSource select() {
       for (int i = 0; i < priorityGroups.length; i++) {
          DataSource ds = priorityGroups[i].select();
          if (ds != null) {
             return ds;
          }
       }
       return null;
   }
}
public class EquityDbManager extends AbstractDBSelector {
    private Map<String, DataSourceHolder> dataSourceMap;
    private WeightRandom weightRandom;
    
    public EquityDbManager(Map<String, DataSourceWrapper> dataSourceWrapperMap, Map<String, Integer> weightMap) {
        this.dataSourceMap = new HashMap<String, DataSourceHolder>(dataSourceWrapperMap.size());
        for (Map.Entry<String, DataSourceWrapper> e : dataSourceWrapperMap.entrySet()) {
           this.dataSourceMap.put(e.getKey(), new DataSourceHolder(e.getValue()));
        }
       this.weightRandom = new WeightRandom(weightMap);
   }
   
   public DataSource select() {
       String key = selectAliveKey(weightRandom, null);
       ->return weightRandom.select(excludeKeys);
       return this.get(key);
       ->DataSourceHolder holder = dataSourceMap.get(dsKey);
       ->return holder.dsw;
   }
}
public class WeightRandom {
    private Map<String, Integer> cachedWeightConfig;
    
    public WeightRandom(Map<String, Integer> weightConfigs) {
       this.init(weightConfigs);
    }
    
    /**
     * 假设三个库权重    10   9   8
    * 那么areaEnds就是  10  19  27
    */
    public String select(List<String> excludeKeys) {
       int[] tempAreaEnd = genAreaEnds(tempWeights);
       ->int sum = 0;
       ->for(int i = 0; i < weights.length; i++)
         ->sum += weights[i];
         ->areaEnds[i] = sum; 
       return select(tempAreaEnd, w.weightKeys);
       ->int rand = random.nextInt(sum); 
       ->for(int i = 0; i < areaEnds.length; i++)
         ->if(rand < areaEnds[i])
           ->return keys[i];     
   }
}
 类似资料: