转载请明显注明出处,尊重南波兔的付出
先来看client
public static List<Long> nextId(String bizType, Integer batchSize) {
if(batchSize == null) {
Long id = nextId(bizType);
List<Long> list = new ArrayList<>();
list.add(id);
return list;
}
IdGenerator idGenerator = client.getIdGenerator(bizType);//看这
return idGenerator.nextId(batchSize);
}
输入bizType业务类型,输入号段数,然后来获取号段
然后会创建相应的idGenerator
public IdGenerator getIdGenerator(String bizType) {
if (generators.containsKey(bizType)) {
return generators.get(bizType);
}
synchronized (this) {//看这 (这里加锁的哟)
if (generators.containsKey(bizType)) {
return generators.get(bizType);
}
IdGenerator idGenerator = createIdGenerator(bizType);//看这
generators.put(bizType, idGenerator);
return idGenerator;
}
}
这里的idGenerator
是CachedIdGenerator
@Override
protected IdGenerator createIdGenerator(String bizType) {
return new CachedIdGenerator(bizType, new HttpSegmentIdServiceImpl());
}
是CachedIdGenerator
是核心类,持有currentSegmentId
和nextSegmentId
对象,负责nextId
的核心流程。
看下其中的字段
protected String bizType;//传入的业务类型
protected SegmentIdService segmentIdService;//service
protected volatile SegmentId current;//当前号段
protected volatile SegmentId next;//下一个号段
private volatile boolean isLoadingNext;//双号段缓存标志
private Object lock = new Object();
private ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("tinyid-generator"));
先看一下SegmentId的内容
private long maxId;//最大可用id,不能超过这个
private long loadingId;//触发申请新号段的阈值
private AtomicLong currentId;//当前的id 原子类
/**
* increment by
*/
private int delta;//步长
/**
* mod num
*/
private int remainder;
private volatile boolean isInit;
//下面挑选的几个重要方法
//该方法保证,如果都是同一个号段,比如1-1000
//不同的生成器中的步长是不一样的嘛
//为了不冲突,它们不可能都从1开始起步
//所以需要init将不同步长的起点设置成不同的值
//如当前是号段是(1000,2000],delta=3, remainder=0,则经过这个方法后,currentId会先递增到1002,之后每次增加delta
public void init() {
if (isInit) {
return;
}
synchronized (this) {//出现了,双重校验锁
if (isInit) {
return;
}
long id = currentId.get();
if (id % delta == remainder) {
isInit = true;
return;
}
for (int i = 0; i <= delta; i++) {
id = currentId.incrementAndGet();
if (id % delta == remainder) {
// 避免浪费 减掉系统自己占用的一个id
currentId.addAndGet(0 - delta);
isInit = true;
return;
}
}
}
}
//每次增长id都是根据delta来确定
public Result nextId() {
init();
long id = currentId.addAndGet(delta);
if (id > maxId) {
return new Result(ResultCode.OVER, id);
}
if (id >= loadingId) {
return new Result(ResultCode.LOADING, id);
}
return new Result(ResultCode.NORMAL, id);
}
这里的核心就是nextId(Integer batchSize)
方法
@Override
public List<Long> nextId(Integer batchSize) {
List<Long> ids = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
Long id = nextId();
ids.add(id);
}
return ids;
}
这里就是按照传入的batchSize来重复调用nextId()
方法获取id
@Override
public Long nextId() {
while (true) {
if (current == null) {
loadCurrent();
continue;
}
Result result = current.nextId();//获取id
if (result.getCode() == ResultCode.OVER) {//当前号段用完,用缓存的下一个号段
loadCurrent();
} else {
if (result.getCode() == ResultCode.LOADING) {//双号段提前缓存
loadNext();
}
return result.getId();
}
}
}
这里就是核心,获取id的方法
如果current
当前号段为空,则loadCurrent()
public synchronized void loadCurrent() {//加锁保证并发安全
if (current == null || !current.useful()) {//useful():return currentId.get() <= maxId;
if (next == null) {
SegmentId segmentId = querySegmentId();//看这
this.current = segmentId;
} else {
current = next;
next = null;
}
}
}
这里通过querySegmentId()
方法获取segmentId
private SegmentId querySegmentId() {
String message = null;
try {
SegmentId segmentId = segmentIdService.getNextSegmentId(bizType);//看这
if (segmentId != null) {
return segmentId;
}
} catch (Exception e) {
message = e.getMessage();
}
throw new TinyIdSysException("error query segmentId: " + message);
}
这里可以看到是通过service来获取的
我们跳进去看一下
@Override
public SegmentId getNextSegmentId(String bizType) {
String url = chooseService(bizType);//从业务所能处理的服务器中随机选择一台
String response = TinyIdHttpUtils.post(url, TinyIdClientConfig.getInstance().getReadTimeout(),
TinyIdClientConfig.getInstance().getConnectTimeout());//发送http请求
logger.info("tinyId client getNextSegmentId end, response:" + response);
if (response == null || "".equals(response.trim())) {
return null;
}
SegmentId segmentId = new SegmentId();
String[] arr = response.split(",");
segmentId.setCurrentId(new AtomicLong(Long.parseLong(arr[0])));
segmentId.setLoadingId(Long.parseLong(arr[1]));
segmentId.setMaxId(Long.parseLong(arr[2]));
segmentId.setDelta(Integer.parseInt(arr[3]));
segmentId.setRemainder(Integer.parseInt(arr[4]));
return segmentId;
}
//从业务所能处理的服务器中随机选择一台
private String chooseService(String bizType) {
List<String> serverList = TinyIdClientConfig.getInstance().getServerList();
String url = "";
if (serverList != null && serverList.size() == 1) {
url = serverList.get(0);
} else if (serverList != null && serverList.size() > 1) {
Random r = new Random();
url = serverList.get(r.nextInt(serverList.size()));
}
url += bizType;
return url;
}
可以看到,这里直接通过http请求获取号段,并没有指定需求的号段大小,当然这里也将bizType传过去了
这里的TinyIdClientConfig中的信息,是在service中int方法,加载配置文件读取的,需要用户来进行指定
访问的url格式为"http://{0}/tinyid/id/nextSegmentIdSimple?token={1}&bizType=";
然后再回到CachedIdGenerator
的方法中,这里就是调用SegmentId.nextId()方法了
同时代用完成之后,通过代用情况,判断是否要更新当前号段,判断是否要更新下一个号段
这里需要注意的是loadNext()
方法,它是异步来获取的,不会阻塞当前线程
public void loadNext() {
if (next == null && !isLoadingNext) {
synchronized (lock) {
if (next == null && !isLoadingNext) {
isLoadingNext = true;
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// 无论获取下个segmentId成功与否,都要将isLoadingNext赋值为false
next = querySegmentId();
} finally {
isLoadingNext = false;
}
}
});
}
}
}
}
所以,对于client来说,它只有第一次http获取号段的时候会阻塞,而之后,为了防止生成id过慢,它会在当前号段尚未用完时,就异步调用获取下一个号段
之后当前号段用完了,再用早已获取的下一个号段来更新当前号段
于是乎,虽然http慢,但是只慢第一次,只要并发量没有那么大时,并不会因为http慢导致整个系统慢
后序的所有id生成都是在client进程中生成的,并不慢