<bean id="dataSource" class="com.taobao.tddl.group.jdbc.TGroupDataSource" init-method="init" destroy-method="destroyDataSource">
<property name="appName" value="gome_market_search_index" />
<property name="dbGroupKey" value="gome_market_search_index_group_0" />
<property name="dsKeyAndWeightCommaArray" value="db0:rwp1q1i0, db1:rwp0q0i1" />
</bean>
public class TGroupDataSource implements DataSource {
private ConfigManager configManager;
private String dsKeyAndWeightCommaArray; //例如 db0:rwp1q1i0, db1:rwp0q0i1
private DataSourceFetcher dataSourceFetcher;
private String appName;
private String dbGroupKey;
/**
* 基于dbGroupKey、appName来初始化多个TAtomDataSource
*/
public void init() {
if (dsKeyAndWeightCommaArray != null) {
//本地配置方式:dsKeyAndWeightCommaArray + dataSourceFetcher + dyType
DataSourceFetcher wrapper = new DataSourceFetcher() {
@Override
public DataSource getDataSource(String key) {
return dataSourceFetcher.getDataSource(key);
}
@Override
public DBType getDataSourceDBType(String key) {
DBType type = dataSourceFetcher.getDataSourceDBType(key);
return type == null ? dbType : type; //如果dataSourceFetcher没dbType,用tgds的dbType
}
};
//dsKeyAndWeightCommaArray 例如 db0:rwp1q1i0, db1:rwp0q0i1
List<DataSourceWrapper> dss = ConfigManager.buildDataSourceWrapper(dsKeyAndWeightCommaArray, wrapper);
->String[] dsWeightArray = dsWeightCommaStr.split(",");
->for (int i = 0; i < dsWeightArray.length; i++)
->String[] dsAndWeight = dsWeightArray[i].split(":");
->DataSourceWrapper dsw = new DataSourceWrapper(dsKey, weightStr,dataSource, fetcherDbType, i);
->this.weight = new Weight(weightStr);
->r = getUnitWeight(weightStr, 'R', weightPattern_r, 0, 10);
->w = getUnitWeight(weightStr, 'W', weightPattern_w, 0, 10);
->dss.add(dsw);
->return dss;
init(dss);
} else {
checkProperties();
configManager = new ConfigManager(this);
configManager.init();
}
}
public void init(List<DataSourceWrapper> dataSourceWrappers) {
configManager = new ConfigManager(this);
configManager.init(dataSourceWrappers);
}
//获取连接,TGroupConnection也只是一个代理
public TGroupConnection getConnection() throws SQLException {
return new TGroupConnection(this);
}
}
public class TGroupStatement implements Statement {
protected TGroupConnection tGroupConnection;
protected TGroupDataSource tGroupDataSource;
private Statement baseStatement;
public TGroupStatement(TGroupDataSource tGroupDataSource, TGroupConnection tGroupConnection) {
this.tGroupDataSource = tGroupDataSource;
this.tGroupConnection = tGroupConnection;
this.retryingTimes = tGroupDataSource.getRetryingTimes();
}
public boolean execute(String sql, String[] columnNames) throws SQLException {
return executeInternal(sql, -1, null, columnNames);
}
private boolean executeInternal(String sql, int autoGeneratedKeys, int[] columnIndexes, String[] columnNames) throws SQLException {
SqlType sqlType = SQLParser.getSqlType(sql);
if (sqlType == SqlType.SELECT || sqlType == SqlType.SELECT_FOR_UPDATE || sqlType == SqlType.SHOW) {
executeQuery(sql);
return true;
} else if (sqlType == SqlType.INSERT || sqlType == SqlType.UPDATE || sqlType == SqlType.DELETE||sqlType == SqlType.REPLACE||sqlType == SqlType.TRUNCATE
|| sqlType == SqlType.CREATE|| sqlType== SqlType.DROP|| sqlType == SqlType.LOAD|| sqlType== SqlType.MERGE) {
if (autoGeneratedKeys == -1 && columnIndexes == null && columnNames == null) {
executeUpdate(sql);
} else if (autoGeneratedKeys != -1) {
executeUpdate(sql, autoGeneratedKeys);
} else if (columnIndexes != null) {
executeUpdate(sql, columnIndexes);
} else if (columnNames != null) {
executeUpdate(sql, columnNames);
->return executeUpdateInternal(sql, -1, null, columnNames);
->Connection conn = tGroupConnection.getBaseConnection(sql,false);
->this.updateCount=executeUpdateOnConnection(conn, sql, autoGeneratedKeys, columnIndexes, columnNames);
} else {
executeUpdate(sql);
}
return false;
} else {
throw new SQLException("only select, insert, update, delete,replace,truncate,create,drop,load,merge sql is supported");
}
}
public ResultSet executeQuery(String sql) throws SQLException {
checkClosed();
ensureResultSetIsEmpty();
boolean gotoRead = SqlType.SELECT.equals(SQLParser.getSqlType(sql)) && tGroupConnection.getAutoCommit();
Connection conn = tGroupConnection.getBaseConnection(sql,gotoRead);
if (conn != null){
sql=GroupHintParser.removeTddlGroupHint(sql);
return executeQueryOnConnection(conn, sql);
}else{
Integer dataSourceIndex = GroupHintParser.convertHint2Index(sql);
sql=GroupHintParser.removeTddlGroupHint(sql);
if (dataSourceIndex < 0) {
dataSourceIndex = ThreadLocalDataSourceIndex.getIndex();
}
return this.tGroupDataSource.getDBSelector(gotoRead).tryExecute(executeQueryTryer, retryingTimes, sql,dataSourceIndex);
->DBSelector getDBSelector(boolean isRead)
->return configManager.getDBSelector(isRead, this.autoSelectWriteDataSource);
->DBSelector dbSelector = isRead ? readDBSelectorWrapper : writeDBSelectorWrapper;
->public <T> T tryExecute(DataSourceTryer<T> tryer, int times, Object... args)
->return this.tryExecute(new LinkedHashMap<DataSource, SQLException>(0),tryer, times, args);
->Map<String, Integer> tableDsIndexMap = groupExtraConfig.getTableDsIndexMap();
->Map<String, Integer> sqlDsIndexMap = groupExtraConfig.getSqlDsIndexMap();
->String actualTable = SQLPreParser.findTableName(nomalSql);
->dataSourceIndex = tableDsIndexMap.get(actualTable);
->DataSourceHolder dsHolder = findDataSourceWrapperByIndex(dataSourceIndex);
->DataSourceHolder holder = priorityGroups[i].findDataSourceWrapperByIndex(dataSourceIndex);
->return tryOnDataSourceHolder(dsHolder, failedDataSources, tryer, times, args);
->T t = tryer.tryOnDataSource(dsHolder.dsw, args);
->Connection conn = TGroupStatement.this.tGroupConnection.createNewConnection(dsw, true);
->conn = dsw.getConnection();
->setBaseConnection(conn, dsw, isRead && isAutoCommit);
->return executeQueryOnConnection(conn, sql);
->Statement stmt = createStatementInternal(conn, false);
->stmt = conn.createStatement();
->this.currentResultSet = stmt.executeQuery(sql);
->return t;
}
}
}
4.1、通过db0:rwp1q1i0, db1:rwp0q0i1设置每个数据源的读写权重
4.2、db0:rwp1q1i0, db1:rwp0q0i1转换成Weight
4.3、根据Weight创建DBSelector
4.4、PriorityDbGroupSelector继承DBSelector根据优先级选择EquityDbManager
4.5、EquityDbManager也继承自DBSelector根据读写权重,随机选择具体数据源
public class PriorityDbGroupSelector extends AbstractDBSelector {
private EquityDbManager[] priorityGroups;
public PriorityDbGroupSelector(EquityDbManager[] priorityGroups) {
this.priorityGroups = priorityGroups;
}
public DataSource select() {
for (int i = 0; i < priorityGroups.length; i++) {
DataSource ds = priorityGroups[i].select();
if (ds != null) {
return ds;
}
}
return null;
}
}
public class EquityDbManager extends AbstractDBSelector {
private Map<String, DataSourceHolder> dataSourceMap;
private WeightRandom weightRandom;
public EquityDbManager(Map<String, DataSourceWrapper> dataSourceWrapperMap, Map<String, Integer> weightMap) {
this.dataSourceMap = new HashMap<String, DataSourceHolder>(dataSourceWrapperMap.size());
for (Map.Entry<String, DataSourceWrapper> e : dataSourceWrapperMap.entrySet()) {
this.dataSourceMap.put(e.getKey(), new DataSourceHolder(e.getValue()));
}
this.weightRandom = new WeightRandom(weightMap);
}
public DataSource select() {
String key = selectAliveKey(weightRandom, null);
->return weightRandom.select(excludeKeys);
return this.get(key);
->DataSourceHolder holder = dataSourceMap.get(dsKey);
->return holder.dsw;
}
}
public class WeightRandom {
private Map<String, Integer> cachedWeightConfig;
public WeightRandom(Map<String, Integer> weightConfigs) {
this.init(weightConfigs);
}
/**
* 假设三个库权重 10 9 8
* 那么areaEnds就是 10 19 27
*/
public String select(List<String> excludeKeys) {
int[] tempAreaEnd = genAreaEnds(tempWeights);
->int sum = 0;
->for(int i = 0; i < weights.length; i++)
->sum += weights[i];
->areaEnds[i] = sum;
return select(tempAreaEnd, w.weightKeys);
->int rand = random.nextInt(sum);
->for(int i = 0; i < areaEnds.length; i++)
->if(rand < areaEnds[i])
->return keys[i];
}
}