当前位置:首页 > python > 正文内容

雪花算法 Snowflake python 实现代码

root4年前 (2022-03-08)python2616
import time
import logging

# 分配位置
WORKER_BITS = 5
DATACENTER_BITS = 5
SEQUENCE_BITS = 12

# 设定设备数量上限
WORKER_UPPER_LIMIT = -1 ^ (-1 << WORKER_BITS)
DATACENTER_UPPER_TIMIT = -1 ^ (-1 << DATACENTER_BITS)


# 组合是的位运算偏移量
WORKER_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_BITS + DATACENTER_BITS

SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)  # 掩码
EPOCH = 1577808001000  # 元时间戳 此处元设为 2020-01-01 00:00:01


class SnowFlake(object):

    def __init__(self, data_center_id, worker_id, sequence=0):
        """
        :param data_center_id: 数据中心编号
        :param worker_id: 机器编号
        :param sequence: 序号
        """
        if worker_id > WORKER_UPPER_LIMIT:
            raise ValueError("WORKER ID 高于上限")
        if worker_id < 0:
            raise ValueError("WORKER ID 低于下限")
        if data_center_id > DATACENTER_UPPER_TIMIT:
            raise ValueError("DATA CENTER ID 高于上限")
        if data_center_id < 0:
            raise ValueError("DATA CENTER ID 低于上限")
        self.worker_id = worker_id
        self.datacenter_id = data_center_id
        self.sequence = sequence

        self.last_timestamp = -1  # 最近一次生成编号的时间戳

    @staticmethod
    def _timestamp(n=1e3) -> int:
        """指定位数时间戳
        :param n:
        :return:
        """
        return int(time.time() * n)

    def _check(self, timestamp):
        """
        超限检查
        :param timestamp:
        :return:
        """
        self._time_back_off_check(timestamp)
        self._number_check(timestamp)

    def _number_check(self, timestamp):
        """
        数超限检查,检查当前时间生成的编号是否超过上限,超过上限则的等到下一个时间生成
        :param timestamp:
        :return:
        """
        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & SEQUENCE_MASK
            if self.sequence == 0:
                timestamp = self._wait_next_time(self.last_timestamp)
        else:
            self.sequence = 0

    def _time_back_off_check(self, timestamp):
        if timestamp < self.last_timestamp:
            logging.error('发现时钟回退,记录到最近一次的时间戳为 {}'.format(self.last_timestamp))
            raise Exception("时钟回拨异常")

    def task(self) -> int:
        """
        获取一个编号
        :return:
        """
        timestamp = self._timestamp()
        self._check(timestamp)
        self.last_timestamp = timestamp
        return self._generate(timestamp)

    def _generate(self, timestamp) -> int:
        """ 生成一个编号
        :param timestamp:
        :return:
        """
        number = ((timestamp - EPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | (
                    self.worker_id << WORKER_SHIFT) | self.sequence
        return number

    def _wait_next_time(self, last_timestamp):
        """等到下一次单位时间
        :param last_timestamp:
        :return:
        """
        timestamp = self._timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._timestamp()
        return timestamp


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=224

分享给朋友:

相关文章

Python eventlet 模块,Timeout() 控制子程序运行时间

pip install  eventlet #安装依赖包# -*- coding:utf-8 -*- import eventlet import time e...

python简单的加密解密

rsa 是非对称加密公钥加密,私钥解密pip install rsaimport rsa from binascii import b2a_hex, a2b_hex class&nb...

python 的configparser 读取配置文件遇到%特殊符号

test.ini 配置文件中有mysql的密码,且密码含有“%”这个特殊符号因为%在py是转义符的含义需要对该字符转义即修改  %  为 %%用%对%进行转义...

cmd启动python交互模式 出现UnicodeDecodeError: 'gbk' codec can't decode byte 0x9a in position 533

这是因为在python交互模式的中输出了中文,且是个输出被记录在.python_history中删除历史记录文件C:\Users\Administrator\.python_history...

获取大文件的MD5值

(, )os.path.(, )logger.()         os.path.(, )logger.()  &...

python csvw格式文件转parquet格式文件

用到的包: pandas    pyarrow    pandas pd df pd.(,,) df.()要求csv文件 要有头行一定要安装pyarro...