#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Twitter 的雪花算法(Snowflake)实现 # # 64位结构: # # --------------------------------------------------------------------------------- # | 符号位(1bits) | 时间戳(41bits) | 数据中心(5bits) | 机器标识(5bits) | 序列号(12bits) | # --------------------------------------------------------------------------------- import threading import time # 纪元时间戳 DEFAULT_EPOCH: int = 1480166465631 # 序列号需占用位数 SEQUENCE_BITS: int = 12 # 机器标识需占用位数 MACHINE_BITS: int = 5 # 数据中心需占用位数 DATACENTER_BITS: int = 5 # 时间戳、数据中心、机器标识所需左移位数 MACHINE_SHIFT: int = SEQUENCE_BITS DATACENTER_SHIFT: int = SEQUENCE_BITS + MACHINE_BITS TIMESTAMP_SHIFT: int = SEQUENCE_BITS + MACHINE_BITS + DATACENTER_BITS # 数据中心、机器标识、序列号所支持的最大值 DATACENTER_MAX: int = -1 ^ (-1 << DATACENTER_BITS) MACHINE_MAX: int = -1 ^ (-1 << MACHINE_BITS) SEQUENCE_MAX: int = -1 ^ (-1 << SEQUENCE_BITS) class SnowflakeId: def __init__(self, datacenter_id: int, machine_id: int): if datacenter_id > DATACENTER_MAX or datacenter_id < 0: raise ValueError(f"Datacenter id must be between 0 and {DATACENTER_MAX}") if machine_id > MACHINE_MAX or machine_id < 0: raise ValueError(f"Machine id must be between 0 and {MACHINE_MAX}") self.lock = threading.Lock() self.datacenter_id: int = datacenter_id self.machine_id: int = machine_id self.sequence: int = 0 self.last_milli: int = -1 self.epoch: int = DEFAULT_EPOCH def next_id(self) -> int: with self.lock: curr_milli: int = self.now_milli if curr_milli < self.last_milli: raise RuntimeError(f"Clock moved backwards. Refusing to generate id for {self.last_milli - curr_milli} milliseconds") if curr_milli == self.last_milli: self.sequence = (self.sequence + 1) & SEQUENCE_MAX if self.sequence == 0: curr_milli = self.next_milli else: self.sequence = 0 self.last_milli = curr_milli # 1 位符号位(固定为0) # 41 位时间戳 + 10 # 10 位机器ID(这里为machineId + datacenterId) # 12 位序列号 return ( (curr_milli - self.epoch) << TIMESTAMP_SHIFT | self.datacenter_id << DATACENTER_SHIFT | self.machine_id << MACHINE_SHIFT | self.sequence ) @property def next_milli(self) -> int: curr_milli = self.now_milli while curr_milli <= self.last_milli: curr_milli = self.now_milli return curr_milli @property def now_milli(self) -> int: return int(time.time() * 1000) ID_GENERATOR = SnowflakeId(datacenter_id=1, machine_id=1) def _get_snowflake_id() -> int: """ 生成雪花算法ID 返回: - int: 雪花算法生成的唯一ID """ return ID_GENERATOR.next_id() def get_snowflake_id_str(tenant_id: int) -> str: """ 生成雪花算法ID字符串 返回: - str: 雪花算法生成的唯一ID字符串 """ # 保证id为19位,不足补0 return f"{_get_snowflake_id():019d}{tenant_id}" def extract_tenant_id_from_id_str(id_str: str) -> int: """ 从雪花算法ID字符串中提取租户ID 参数: - id_str: 雪花算法生成的唯一ID字符串 返回: - int: 租户ID """ # 从19位后开始提取,先判断是否足够长 if len(id_str) <= 19: raise ValueError(f"ID字符串长度不足,必须为19位以上") return int(id_str[19:])