| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # Twitter 的雪花算法(Snowflake)实现
- #
- # 64位结构:
- #
- # ---------------------------------------------------------------------------------
- # | 符号位(1bits) | 时间戳(41bits) | 数据中心(5bits) | 机器标识(5bits) | 序列号(12bits) |
- # ---------------------------------------------------------------------------------
- import threading
- import time
- # 纪元时间戳
- DEFAULT_EPOCH: int = 1480166465631
- # 序列号需占用位数
- SEQUENCE_BITS: int = 12
- # 机器标识需占用位数
- MACHINE_BITS: int = 5
- # 数据中心需占用位数
- DATACENTER_BITS: int = 5
- # 时间戳、数据中心、机器标识所需左移位数
- MACHINE_SHIFT: int = SEQUENCE_BITS
- DATACENTER_SHIFT: int = SEQUENCE_BITS + MACHINE_BITS
- TIMESTAMP_SHIFT: int = SEQUENCE_BITS + MACHINE_BITS + DATACENTER_BITS
- # 数据中心、机器标识、序列号所支持的最大值
- DATACENTER_MAX: int = -1 ^ (-1 << DATACENTER_BITS)
- MACHINE_MAX: int = -1 ^ (-1 << MACHINE_BITS)
- SEQUENCE_MAX: int = -1 ^ (-1 << SEQUENCE_BITS)
- class SnowflakeId:
- def __init__(self, datacenter_id: int, machine_id: int):
- if datacenter_id > DATACENTER_MAX or datacenter_id < 0:
- raise ValueError(f"Datacenter id must be between 0 and {DATACENTER_MAX}")
- if machine_id > MACHINE_MAX or machine_id < 0:
- raise ValueError(f"Machine id must be between 0 and {MACHINE_MAX}")
- self.lock = threading.Lock()
- self.datacenter_id: int = datacenter_id
- self.machine_id: int = machine_id
- self.sequence: int = 0
- self.last_milli: int = -1
- self.epoch: int = DEFAULT_EPOCH
- def next_id(self) -> int:
- with self.lock:
- curr_milli: int = self.now_milli
- if curr_milli < self.last_milli:
- raise RuntimeError(f"Clock moved backwards. Refusing to generate id for {self.last_milli - curr_milli} milliseconds")
- if curr_milli == self.last_milli:
- self.sequence = (self.sequence + 1) & SEQUENCE_MAX
- if self.sequence == 0:
- curr_milli = self.next_milli
- else:
- self.sequence = 0
- self.last_milli = curr_milli
- # 1 位符号位(固定为0)
- # 41 位时间戳 + 10
- # 10 位机器ID(这里为machineId + datacenterId)
- # 12 位序列号
- return (
- (curr_milli - self.epoch) << TIMESTAMP_SHIFT
- | self.datacenter_id << DATACENTER_SHIFT
- | self.machine_id << MACHINE_SHIFT
- | self.sequence
- )
- @property
- def next_milli(self) -> int:
- curr_milli = self.now_milli
- while curr_milli <= self.last_milli:
- curr_milli = self.now_milli
- return curr_milli
- @property
- def now_milli(self) -> int:
- return int(time.time() * 1000)
- ID_GENERATOR = SnowflakeId(datacenter_id=1, machine_id=1)
- def _get_snowflake_id() -> int:
- """
- 生成雪花算法ID
- 返回:
- - int: 雪花算法生成的唯一ID
- """
- return ID_GENERATOR.next_id()
- def get_snowflake_id_str(tenant_id: int) -> str:
- """
- 生成雪花算法ID字符串
- 返回:
- - str: 雪花算法生成的唯一ID字符串
- """
- # 保证id为19位,不足补0
- return f"{_get_snowflake_id():019d}{tenant_id}"
- def extract_tenant_id_from_id_str(id_str: str) -> int:
- """
- 从雪花算法ID字符串中提取租户ID
- 参数:
- - id_str: 雪花算法生成的唯一ID字符串
- 返回:
- - int: 租户ID
- """
- # 从19位后开始提取,先判断是否足够长
- if len(id_str) <= 19:
- raise ValueError(f"ID字符串长度不足,必须为19位以上")
- return int(id_str[19:])
|