当前位置: 首页 > 工具软件 > go-snowflake > 使用案例 >

snowflake和snoyflake雪花算法以及golang实现

钱经赋
2023-12-01

snowflake和snoyflake雪花算法学习与go实现

预备知识:

Monotonic Clocks,即单调时间,所谓单调,就是只会不停的往前增长,不受校时操作的影响,这个时间是自进程启动以来的秒数

参考文章:https://www.simpleapples.com/2018/10/26/understand-time-struct-in-go/

雪花算法是twitter开源的在分布式环境下生成的唯一id生成算法。

1 推特雪花算法源码解读

推特雪花算法标准格式如下:

id 是64位整型的

+--------------------------------------------------------------------------+
| 1 Bit Unused | 41 Bit Timestamp |  10 Bit NodeID  |   12 Bit Sequence ID |
+--------------------------------------------------------------------------+
  • 第一个bit未使用默认为0
  • 41bit 存储毫秒级时间戳,时间单位为毫秒
  • 10bit 存储节点的id 10个bit表示最多可以扩展1024个节点
  • 12bit 自增id ,表示一个时间单位内,一个节点可生成的id最多为4096个

对于这样一个结构,一个机器,在1秒内 最多可以生成4096*1000约 400万个id。

使用

package main

import (
	"fmt"

	"github.com/bwmarrin/snowflake"
)

func main() {

	// Create a new Node with a Node number of 1
	node, err := snowflake.NewNode(1)
	if err != nil {
		fmt.Println(err)
		return
	}

	// Generate a snowflake ID.
	id := node.Generate()

	// Print out the ID in a few different ways.
	fmt.Printf("Int64  ID: %d\n", id)
	fmt.Printf("String ID: %s\n", id)
	fmt.Printf("Base2  ID: %s\n", id.Base2())
	fmt.Printf("Base64 ID: %s\n", id.Base64())

	// Print out the ID's timestamp
	fmt.Printf("ID Time  : %d\n", id.Time())

	// Print out the ID's node number
	fmt.Printf("ID Node  : %d\n", id.Node())

	// Print out the ID's sequence number
	fmt.Printf("ID Step  : %d\n", id.Step())
}

输出:
Int64  ID: 1283328784765816832
String ID: 1283328784765816832
Base2  ID: 1000111001111010011001011101011111010000000000001000000000000
Base64 ID: MTI4MzMyODc4NDc2NTgxNjgzMg==
ID Time  : 1594804400041
ID Node  : 1
ID Step  : 0

生成的id 二进制格式为:

0 00100011100111101001100101110101111101000 0000000001 000000000000

源码阅读:

github.com/bwmarrin/snowflake

var(
// Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC in milliseconds
	// You may customize this to set a different epoch for your application.
	Epoch int64 = 1288834974657   // 对应的是41bit的毫秒时间戳,

	// NodeBits holds the number of bits to use for Node
	// Remember, you have a total 22 bits to share between Node/Step
	NodeBits uint8 = 10    // 节点id占用8个bit

	// StepBits holds the number of bits to use for Step
	// Remember, you have a total 22 bits to share between Node/Step
	StepBits uint8 = 12    // 自增id占用12个bit
)

Epoch;默认的是Nov 04 2010 01:42:54 UTC的毫秒时间戳,也可以自行调整

节点id和自增id总共占用22个bit,可以根据节点数自行调整

节点结构体定义:

type Node struct {
	mu    sync.Mutex
	epoch time.Time    
	time  int64
	node  int64  
	step  int64

	nodeMax   int64   // 节点的最大id值
	nodeMask  int64   // 节点掩码
	stepMask  int64   // 自增id掩码
	timeShift uint8    // 时间戳移位位数
	nodeShift uint8     // 节点移位位数
}

生成节点函数:

func NewNode(node int64) (*Node, error) {
	// 输入的node值为节点id值。
	// re-calc in case custom NodeBits or StepBits were set
	// DEPRECATED: the below block will be removed in a future release.
	mu.Lock()
	nodeMax = -1 ^ (-1 << NodeBits) 
	nodeMask = nodeMax << StepBits 
	stepMask = -1 ^ (-1 << StepBits)  
	timeShift = NodeBits + StepBits   
	nodeShift = StepBits  
	mu.Unlock()

	n := Node{}
	n.node = node
	n.nodeMax = -1 ^ (-1 << NodeBits)//求节点id最大值,当notebits为10时,nodemax值位1023
	n.nodeMask = n.nodeMax << StepBits// 节点id掩码
	n.stepMask = -1 ^ (-1 << StepBits)// 自增id掩码
	n.timeShift = NodeBits + StepBits//时间戳左移的位数
	n.nodeShift = StepBits// 节点id左移的位数

	if n.node < 0 || n.node > n.nodeMax {
		return nil, errors.New("Node number must be between 0 and " + strconv.FormatInt(n.nodeMax, 10))
	}

	var curTime = time.Now()
	// 这里n.epoch的值为默认epoch值,但单掉时间为一个负数,表示当前时间到默认事件的差值。
	n.epoch = curTime.Add(time.Unix(Epoch/1000, (Epoch%1000)*1000000).Sub(curTime))

	return &n, nil
}

节点生成id的方法:

func (n *Node) Generate() ID {

	n.mu.Lock()

	now := time.Since(n.epoch).Nanoseconds() / 1000000 
    //求出当前时间,使用的是单调时间
	
    // 如果在同一个时间单位内,就对自增id进行+1操作
	if now == n.time {
		n.step = (n.step + 1) & n.stepMask
		// 当step达到最大值,再加1,就为0。即表示再这个时间单位内,不能再生成更多的id了,需要等待到下一个时间单位内。
		if n.step == 0 {
			for now <= n.time {
				now = time.Since(n.epoch).Nanoseconds() / 1000000
			}
		}
	} else {
        // 反之 自增id设为0
		n.step = 0
	}
	// 将now值赋值给n.time
	n.time = now
	// 合成id,将3部分移位并做或操作
	r := ID((now)<<n.timeShift |(n.node << n.nodeShift) |(n.step),)

	n.mu.Unlock()
	return r
}

优缺点

原生的Snowflake算法是完全依赖于时间的,如果有时钟回拨的情况发生,会生成重复的ID,市场上的解决方案也是非常多的:

  • 最简单的方案,就是关闭生成唯一ID机器的时间同步。
  • 使用阿里云的的时间服务器进行同步,2017年1月1日的闰秒调整,阿里云服务器NTP系统24小时“消化”闰秒,完美解决了问题。
  • 如果发现有时钟回拨,时间很短比如5毫秒,就等待,然后再生成。或者就直接报错,交给业务层去处理。
  • 可以找2bit位作为时钟回拨位,发现有时钟回拨就将回拨位加1,达到最大位后再从0开始进行循环。

2 索尼雪花算法

索尼雪花算法标准格式如下:

id 是64位整型的

+--------------------------------------------------------------------------+
| 1 Bit Unused | 39 Bit Timestamp | 8 Bit sequence number  |   16 Bit machine id |
+--------------------------------------------------------------------------+
  • 第一个bit未使用默认为0
  • 39bit 存储10毫秒级时间戳,时间单位为10毫秒
  • 8bit 存储自增id,即一个时间单位内,一个节点可生成的id最多为521个
  • 16bit 机器(或者线程)id,最多有2^16个机器或者线程

使用

func getMachineID()(uint16,error){
	return uint16(1),nil
}

func checkMachineID(machineID uint16)bool{
	return true
}

func main() {
	t:=time.Unix(0,0)
	settings:=sonyflake.Settings{
		StartTime: t,
		MachineID: getMachineID,
		CheckMachineID: checkMachineID,
	}
	sf:=sonyflake.NewSonyflake(settings)
	id,_:=sf.NextID()
	fmt.Println(id)
}

源码解读

"github.com/sony/sonyflake"

type Settings struct {
	StartTime      time.Time
	MachineID      func() (uint16, error)
	CheckMachineID func(uint16) bool
}

在启动阶段,需要配置参数

  • starttime:起始时间,类似于snowflake的epoch,默认为2014-09-01 00:00:00 +0000 UTC
  • machineID:可自定义当前的机器id(或者线程id),默认是本机ip的低16位
  • chenckmachineid:可自定义检查machineid是否合法或冲突的函数。默认不做验证。
type Sonyflake struct {
	mutex       *sync.Mutex
	startTime   int64  
	elapsedTime int64   // 上一次生成id的时间
	sequence    uint16  // 自增id
	machineID   uint16   // 机器id
}


func NewSonyflake(st Settings) *Sonyflake {
	sf := new(Sonyflake)
	sf.mutex = new(sync.Mutex)
	sf.sequence = uint16(1<<BitLenSequence - 1)// 11111111

	if st.StartTime.After(time.Now()) {// starttime在当前时间之后,就返回空值
		return nil
	}
	if st.StartTime.IsZero() {
		sf.startTime = toSonyflakeTime(time.Date(2014, 9, 1, 0, 0, 0, 0, time.UTC))
	} else {
		sf.startTime = toSonyflakeTime(st.StartTime)
	}

	var err error
	if st.MachineID == nil {
		sf.machineID, err = lower16BitPrivateIP()
	} else {
		sf.machineID, err = st.MachineID()
	}
	if err != nil || (st.CheckMachineID != nil && !st.CheckMachineID(sf.machineID)) {
		return nil
	}

	return sf
}

//返回 10ms为单位的时间戳。
func toSonyflakeTime(t time.Time) int64 {
	return t.UTC().UnixNano() / sonyflakeTimeUnit
}

生成id源码:

func (sf *Sonyflake) NextID() (uint64, error) {
	const maskSequence = uint16(1<<BitLenSequence - 1)
    
	sf.mutex.Lock()
	defer sf.mutex.Unlock()

	current := currentElapsedTime(sf.startTime)//从起始时间到现在经过的时间
	if sf.elapsedTime < current {//比上一次生成id时间大 则自增id变为0.
		sf.elapsedTime = current
		sf.sequence = 0
	} else { // sf.elapsedTime >= current
       // 若相等,则还在同一个时间单位内,序列id+1
        // 若此时大于, 经过的时间比上一次生成id时间小,说明发生了时间回拨,
		sf.sequence = (sf.sequence + 1) & maskSequence
		if sf.sequence == 0 {
			sf.elapsedTime++
            // 貌似睡眠这段代码处理时间回拨问题,但是不明白为啥要在swquence=0处理
			overtime := sf.elapsedTime - current
			time.Sleep(sleepTime((overtime)))
		}
	}

	return sf.toID()
}


// 生成id的时候,用的不是curret,而是elapsedTime

func (sf *Sonyflake) toID() (uint64, error) {
	if sf.elapsedTime >= 1<<BitLenTime {
		return 0, errors.New("over the time limit")
	}

	return uint64(sf.elapsedTime)<<(BitLenSequence+BitLenMachineID) |
		uint64(sf.sequence)<<BitLenMachineID |
		uint64(sf.machineID), nil
}

关于时间回拨问题:

  • 只有当current大于elapsedTime,才会将current赋值给elapsedTime,也就是说elapsedTime是一直增大的,即使时钟回拨,也不会改变elapsedTime。
  • 如果没有发生时间回拨,就是sf.elapsedTime = current,自增id满了以后,这个单位时间内不能再生成id了,就需要睡眠一下,等到下一个时间单位。
  • 当发生时间回拨,sequence自增加1。当sequence加满,重新变为0后,为了防止重复id,将elapsedTime+1,这个时候elapsedTime还大于current,睡眠一会儿。

这个对处理时间回拨就是简单粗暴的睡眠等待。

 类似资料: