当前位置: 首页 > 工具软件 > TDDL > 使用案例 >

TDDL 在分布式下的SEQUENCE原理

相俊迈
2023-12-01
  • 在数据库中创建 sequence 表,用于记录,当前已被占用的id最大值。
  • 每台客户端主机取一个id区间(比如 1000~2000)缓存在本地,并更新 sequence 表中的id最大值记录。
  • 客户端主机之间取不同的id区间,用完再取,使用乐观锁机制控制并发。

TDDL大家应该很熟悉了,淘宝分布式数据层。很好的为我们实现了分库分表、Master/Salve、动态数据源配置等功能。

那么分布式之后,数据库自增序列肯定用不了了,如何方便快捷的解决这个问题呢?TDDL也提供了SEQUENCE的解决方案。

下面就来简单剖析一下实现原理。。。。。。

第一步:创建一张sequence对应的表。

CREATE TABLE `imp_sequence` (
  `BIZ_NAME` varchar(45) NOT NULL COMMENT '业务名称',
  `CURRENT_VALUE` int(11) NOT NULL COMMENT '当前最大值',
  `GMT_CREATE` datetime DEFAULT NULL COMMENT '创建时间',
  `GMT_MODIFIED` datetime DEFAULT NULL COMMENT '修改时间',
  PRIMARY KEY (`BIZ_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='数据序列表';

表名和字段可以按各自规则定义,定义之后需要与第二步DAO中的定义相对应!

几张逻辑表需要声明几个sequence
第二步:配置sequenceDao

<bean id="sequenceDao" class="com.taobao.tddl.client.sequence.impl.DefaultSequenceDao">
        <!-- 数据源 -->
        <property name="dataSource"  ref="dataSource" />
        <!-- 步长-->
        <property name="step" value="1000" />
        <!-- 重试次数-->
        <property name="retryTimes" value="1" />
        <!-- sequence 表名-->
        <property name="tableName" value="gt_sequence" />
        <!-- sequence 名称-->
        <property name="nameColumnName" value="BIZ_NAME" />
        <!-- sequence 当前值-->
        <property name="valueColumnName" value="CURRENT_VALUE" />
        <!-- sequence 更新时间-->
        <property name="gmtModifiedColumnName" value="gmt_modified" />
    </bean>

第三步:配置sequence生成器

<bean id="businessSequence"  class="com.taobao.tddl.client.sequence.impl.DefaultSequence">
        <property name="sequenceDao" ref="sequenceDao"/>
        <property name="name" value="business_sequence" />
    </bean>

第四步:调用

public class IbatisSmDAO extends SqlMapClientDaoSupport implements SmDAO {

  /**smSequence*/
  private DefaultSequence   businessSequence;

    public int insert(SmDO sm) throws DataAccessException {
        if (sm == null) {
            throw new IllegalArgumentException("Can't insert a null data object into db.");
        }

        try {
            sm.setId((int)businessSequence.nextValue());
        } catch (SequenceException e) {
            throw new RuntimeException("Can't get primary key.");
        }

        getSqlMapClientTemplate().insert("MS-SM-INSERT", sm);

        return sm.getId();
    }
}

从调用配置中,我们可以发现其中涉及到二个重要类DefaultSequenceDao和DefaultSequence,这二个都是TDDL的默认实现。DefaultSequenceDao:序列DAO默认实现,JDBC方式。DefaultSequence:序列默认实现
先来看DefaultSequenceDao,TDDL中提供了默认的表名,列名和步长等,第一步的建表可以参照默认方式。

private static final int MIN_STEP = 1;
private static final int MAX_STEP = 100000;
private static final int DEFAULT_STEP = 1000;
private static final int DEFAULT_RETRY_TIMES = 150;

private static final String DEFAULT_TABLE_NAME = "sequence";
private static final String DEFAULT_NAME_COLUMN_NAME = "name";
private static final String DEFAULT_VALUE_COLUMN_NAME = "value";
private static final String DEFAULT_GMT_MODIFIED_COLUMN_NAME = "gmt_modified";

private static final long DELTA = 100000000L;

private DataSource dataSource;

/**
 * 重试次数
 */
private int retryTimes = DEFAULT_RETRY_TIMES;

/**
 * 步长
 */
private int step = DEFAULT_STEP;

/**
 * 序列所在的表名
 */
private String tableName = DEFAULT_TABLE_NAME;

/**
 * 存储序列名称的列名
 */
private String nameColumnName = DEFAULT_NAME_COLUMN_NAME;

/**
 * 存储序列值的列名
 */
private String valueColumnName = DEFAULT_VALUE_COLUMN_NAME;

/**
 * 存储序列最后更新时间的列名
 */
private String gmtModifiedColumnName = DEFAULT_GMT_MODIFIED_COLUMN_NAME;

接下来看一下nextRange方法:取得下一个可用的序列区间

<code class="language-java">public SequenceRange nextRange(String name) throws SequenceException {  
        if (name == null) {  
            throw new IllegalArgumentException("序列名称不能为空");  
        }  

        long oldValue;  
        long newValue;  

        Connection conn = null;  
        PreparedStatement stmt = null;  
        ResultSet rs = null;  

        for (int i = 0; i < retryTimes + 1; ++i) {  
            try {  
                conn = dataSource.getConnection();  
                stmt = conn.prepareStatement(getSelectSql());  
                stmt.setString(1, name);  
                rs = stmt.executeQuery();  
                rs.next();  
                oldValue = rs.getLong(1);  

                if (oldValue < 0) {  
                    StringBuilder message = new StringBuilder();  
                    message.append("Sequence value cannot be less than zero, value = ").append(oldValue);  
                    message.append(", please check table ").append(getTableName());  

                    throw new SequenceException(message.toString());  
                }  

                if (oldValue > Long.MAX_VALUE - DELTA) {  
                    StringBuilder message = new StringBuilder();  
                    message.append("Sequence value overflow, value = ").append(oldValue);  
                    message.append(", please check table ").append(getTableName());  

                    throw new SequenceException(message.toString());  
                }  

                newValue = oldValue + getStep();  
            } catch (SQLException e) {  
                throw new SequenceException(e);  
            } finally {  
                closeResultSet(rs);  
                rs = null;  
                closeStatement(stmt);  
                stmt = null;  
                closeConnection(conn);  
                conn = null;  
            }  

            try {  
                conn = dataSource.getConnection();  
                stmt = conn.prepareStatement(getUpdateSql());  
                stmt.setLong(1, newValue);  
                stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));  
                stmt.setString(3, name);  
                stmt.setLong(4, oldValue);  
                int affectedRows = stmt.executeUpdate();  
                if (affectedRows == 0) {  
                    // retry  
                    continue;  
                }  

                return new SequenceRange(oldValue + 1, newValue);  
            } catch (SQLException e) {  
                throw new SequenceException(e);  
            } finally {  
                closeStatement(stmt);  
                stmt = null;  
                closeConnection(conn);  
                conn = null;  
            }  
        }  

        throw new SequenceException("Retried too many times, retryTimes = " + retryTimes);  
    }

通过getSelectSql查询最新的value值,然后加上步点,通过getUpdateSql更新到数据库中

  private String getSelectSql() {
        if (selectSql == null) {
            synchronized (this) {
                if (selectSql == null) {
                    StringBuilder buffer = new StringBuilder();
                    buffer.append("select ").append(getValueColumnName());
                    buffer.append(" from ").append(getTableName());
                    buffer.append(" where ").append(getNameColumnName()).append(" = ?");

                    selectSql = buffer.toString();
                }
            }
        }

        return selectSql;
    }

    private String getUpdateSql() {
        if (updateSql == null) {
            synchronized (this) {
                if (updateSql == null) {
                    StringBuilder buffer = new StringBuilder();
                    buffer.append("update ").append(getTableName());
                    buffer.append(" set ").append(getValueColumnName()).append(" = ?, ");
                    buffer.append(getGmtModifiedColumnName()).append(" = ? where ");
                    buffer.append(getNameColumnName()).append(" = ? and ");
                    buffer.append(getValueColumnName()).append(" = ?");

                    updateSql = buffer.toString();
                }
            }
        }

        return updateSql;
    }

有一个特殊需要说明的,在update语句中,where需要把之前的value当成条件传入。实现了类型version的乐观锁操作。如果同一个时间AB二台机器同时请求获取到相同的value,进行update操作只有可能一条成功。失败的会按retryTimes进行重试。
接下来看DefaultSequence,比较简单,就不说明了

public class DefaultSequence implements Sequence {
    private final Lock lock = new ReentrantLock();

    private SequenceDao sequenceDao;

    /**
     * 序列名称
     */
    private String name;

    private volatile SequenceRange currentRange;

    public long nextValue() throws SequenceException {
        if (currentRange == null) {
            lock.lock();
            try {
                if (currentRange == null) {
                    currentRange = sequenceDao.nextRange(name);
                }
            } finally {
                lock.unlock();
            }
        }

        long value = currentRange.getAndIncrement();
        if (value == -1) {
            lock.lock();
            try {
                for (;;) {
                    if (currentRange.isOver()) {
                        currentRange = sequenceDao.nextRange(name);
                    }

                    value = currentRange.getAndIncrement();
                    if (value == -1) {
                        continue;
                    }

                    break;
                }
            } finally {
                lock.unlock();
            }
        }

        if (value < 0) {
            throw new SequenceException("Sequence value overflow, value = " + value);
        }

        return value;
    }

    public SequenceDao getSequenceDao() {
        return sequenceDao;
    }

    public void setSequenceDao(SequenceDao sequenceDao) {
        this.sequenceDao = sequenceDao;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
 类似资料: