当前位置: 首页 > 工具软件 > Leaf > 使用案例 >

leaf 的雪花算法实现 简析

陈志
2023-12-01

概览

本文跟一下leaf的雪花模式的算法

关注点:

  • workerid生成
  • 时间回拨问题解决

leaf是美团开源的分布式id 项目

源码分析

  • 首先从server的Controller出发,看一下雪花算法生成的方法

  • @RequestMapping(value = "/api/snowflake/get/{key}")
    public String getSnowflakeId(@PathVariable("key") String key) {
        return get(key, snowflakeService.getId(key));
    }
    
  • 进入到snowflakeService

  • 发现核心生成id的类是SnowflakeIDGenImpl

  • private final long workerIdBits = 10L;
    private final long sequenceBits = 12L;
    
  • 正常的workerid长度1024,以及序号4096

  • 时间起始时间是

    //Thu Nov 04 2010 09:42:54 GMT+0800 (中国标准时间) 
    this(zkAddress, port, 1288834974657L);
    
  • 	public SnowflakeIDGenImpl(String zkAddress, int port, long twepoch) {
            this.twepoch = twepoch;
            Preconditions.checkArgument(timeGen() > twepoch, "Snowflake not support twepoch gt currentTime");
            final String ip = Utils.getIp();
            SnowflakeZookeeperHolder holder = new SnowflakeZookeeperHolder(ip, String.valueOf(port), zkAddress);//获得zookeeper的连接其
            LOGGER.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", twepoch, ip, zkAddress, port);
            boolean initFlag = holder.init();//在这里是生成id
            if (initFlag) {
                workerId = holder.getWorkerID();//获得当前生成器机器的id
                LOGGER.info("START SUCCESS USE ZK WORKERID-{}", workerId);
            } else {
                Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok");
            }
            Preconditions.checkArgument(workerId >= 0 && workerId <= maxWorkerId, "workerID must gte 0 and lte 1023");
        }
    
  • 来看看SnowflakeZookeeperHolder.init()方法

  • public boolean init() {
            try {
                CuratorFramework curator = createWithOptions(connectionString, new RetryUntilElapsed(1000, 4), 10000, 6000);
                curator.start();
                Stat stat = curator.checkExists().forPath(PATH_FOREVER);
                if (stat == null) {
                    //不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
                    zk_AddressNode = createNode(curator);
                    //worker id 默认是0
                    updateLocalWorkerID(workerID);
                    //定时上报本机时间给forever节点
                    ScheduledUploadData(curator, zk_AddressNode);
                    return true;
                } else {
                    Map<String, Integer> nodeMap = Maps.newHashMap();//ip:port->00001
                    Map<String, String> realNode = Maps.newHashMap();//ip:port->(ipport-000001)
                    //存在根节点,先检查是否有属于自己的根节点
                    List<String> keys = curator.getChildren().forPath(PATH_FOREVER);
                    for (String key : keys) {
                        String[] nodeKey = key.split("-");
                        realNode.put(nodeKey[0], key);
                        nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
                    }
                    Integer workerid = nodeMap.get(listenAddress);
                    if (workerid != null) {
                        //有自己的节点,zk_AddressNode=ip:port
                        zk_AddressNode = PATH_FOREVER + "/" + realNode.get(listenAddress);
                        workerID = workerid;//启动worder时使用会使用
                        if (!checkInitTimeStamp(curator, zk_AddressNode)) {
                            throw new CheckLastTimeException("init timestamp check error,forever node timestamp gt this node time");
                        }
                        //准备创建临时节点
                        doService(curator);
                        updateLocalWorkerID(workerID);
                        LOGGER.info("[Old NODE]find forever node have this endpoint ip-{} port-{} workid-{} childnode and start SUCCESS", ip, port, workerID);
                    } else {
                        //表示新启动的节点,创建持久节点 ,不用check时间
                        String newNode = createNode(curator);
                        zk_AddressNode = newNode;
                        String[] nodeKey = newNode.split("-");
                        workerID = Integer.parseInt(nodeKey[1]);
                        doService(curator);
                        updateLocalWorkerID(workerID);
                        LOGGER.info("[New NODE]can not find node on forever node that endpoint ip-{} port-{} workid-{},create own node on forever node and start SUCCESS ", ip, port, workerID);
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Start node ERROR {}", e);
                try {
                    Properties properties = new Properties();
                    properties.load(new FileInputStream(new File(PROP_PATH.replace("{port}", port + ""))));
                    workerID = Integer.valueOf(properties.getProperty("workerID"));
                    LOGGER.warn("START FAILED ,use local node file properties workerID-{}", workerID);
                } catch (Exception e1) {
                    LOGGER.error("Read file error ", e1);
                    return false;
                }
            }
            return true;
        }
    
  • 逻辑梳理
    连接zookeeper
    if(业务节点为空)
    	创建根节点
    	记录根节点信息
    else(业务节点不为空)
    	获取业务节点下的所有子节点,放到map中//ip:port->00001
    	检查有没有自己
        if 有://这里是防止机器宕机重启,从而可以来获取wokerid
    		workid 则就是zookeeper的序号
        else 没有:
    		创建持久有序节点
            获取workerid
    
  • workerid是最关键的,其他都是次要

  • 再来看雪花算法如何获取id

  • 这里从leafController的getId()来获取id

  • 一路跟到SnowflakeIDGenImpl

  • @Override
        public synchronized Result get(String key) {
            long timestamp = timeGen();
            if (timestamp < lastTimestamp) {//时钟回拨发生 这里单位是毫秒
                long offset = lastTimestamp - timestamp;
                if (offset <= 5) {
                    try {
                        wait(offset << 1); // 等待最多10ms
                        timestamp = timeGen();
                        if (timestamp < lastTimestamp) { //如果还出现问题,则是无法解决,召唤程序员
                            return new Result(-1, Status.EXCEPTION);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("wait interrupted");
                        return new Result(-2, Status.EXCEPTION);
                    }
                } else {//超过5ms, 直接召唤程序员
                    return new Result(-3, Status.EXCEPTION);
                }
            }
            if (lastTimestamp == timestamp) {//时间相同, 则seq累加呗
                sequence = (sequence + 1) & sequenceMask;
                if (sequence == 0) {
                    //seq 为0的时候表示是下一毫秒时间开始对seq做随机
                    sequence = RANDOM.nextInt(100);
                    timestamp = tilNextMillis(lastTimestamp);
                }
            } else {
                //如果是新的ms开始
                sequence = RANDOM.nextInt(100);//这里的随机启动我有点不太明白
            }
            lastTimestamp = timestamp;
            long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;//移位处理,组成64位的long
            return new Result(id, Status.SUCCESS);
    
        }
    
  • 可以看到, leaf解决时钟回拨的思路就是 延迟等待 , 不行就召唤程序员

  • 除了雪花模式,leaf也支持号段模式,因为之前详细分析了tinyid, 所以这里就不再解释了

  • 本文的目的也是在关注leaf在雪花模式中如何来生成workerid以及解决时钟回拨问题

总结

  • 用zookeeper来生成workerid, 通过 持久有序节点, 保证生成器宕机重启,还能使用之前的workerid
  • 延迟等待来解决时间回拨问题

源码地址

https://github.com/Meituan-Dianping/Leaf

 类似资料: