当前位置:首页 > python > 正文内容

雪花算法 Snowflake python 实现代码

root2年前 (2022-03-08)python1430
import time
import logging

# 分配位置
WORKER_BITS = 5
DATACENTER_BITS = 5
SEQUENCE_BITS = 12

# 设定设备数量上限
WORKER_UPPER_LIMIT = -1 ^ (-1 << WORKER_BITS)
DATACENTER_UPPER_TIMIT = -1 ^ (-1 << DATACENTER_BITS)


# 组合是的位运算偏移量
WORKER_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_BITS + DATACENTER_BITS

SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)  # 掩码
EPOCH = 1577808001000  # 元时间戳 此处元设为 2020-01-01 00:00:01


class SnowFlake(object):

    def __init__(self, data_center_id, worker_id, sequence=0):
        """
        :param data_center_id: 数据中心编号
        :param worker_id: 机器编号
        :param sequence: 序号
        """
        if worker_id > WORKER_UPPER_LIMIT:
            raise ValueError("WORKER ID 高于上限")
        if worker_id < 0:
            raise ValueError("WORKER ID 低于下限")
        if data_center_id > DATACENTER_UPPER_TIMIT:
            raise ValueError("DATA CENTER ID 高于上限")
        if data_center_id < 0:
            raise ValueError("DATA CENTER ID 低于上限")
        self.worker_id = worker_id
        self.datacenter_id = data_center_id
        self.sequence = sequence

        self.last_timestamp = -1  # 最近一次生成编号的时间戳

    @staticmethod
    def _timestamp(n=1e3) -> int:
        """指定位数时间戳
        :param n:
        :return:
        """
        return int(time.time() * n)

    def _check(self, timestamp):
        """
        超限检查
        :param timestamp:
        :return:
        """
        self._time_back_off_check(timestamp)
        self._number_check(timestamp)

    def _number_check(self, timestamp):
        """
        数超限检查,检查当前时间生成的编号是否超过上限,超过上限则的等到下一个时间生成
        :param timestamp:
        :return:
        """
        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & SEQUENCE_MASK
            if self.sequence == 0:
                timestamp = self._wait_next_time(self.last_timestamp)
        else:
            self.sequence = 0

    def _time_back_off_check(self, timestamp):
        if timestamp < self.last_timestamp:
            logging.error('发现时钟回退,记录到最近一次的时间戳为 {}'.format(self.last_timestamp))
            raise Exception("时钟回拨异常")

    def task(self) -> int:
        """
        获取一个编号
        :return:
        """
        timestamp = self._timestamp()
        self._check(timestamp)
        self.last_timestamp = timestamp
        return self._generate(timestamp)

    def _generate(self, timestamp) -> int:
        """ 生成一个编号
        :param timestamp:
        :return:
        """
        number = ((timestamp - EPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | (
                    self.worker_id << WORKER_SHIFT) | self.sequence
        return number

    def _wait_next_time(self, last_timestamp):
        """等到下一次单位时间
        :param last_timestamp:
        :return:
        """
        timestamp = self._timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._timestamp()
        return timestamp


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=224

分享给朋友:

相关文章

python 调用linux命令 subprocess.popen

import subprocesscommd = "echo 123"p1 = subprocess.Popen(commd, shell=True, stdout=subprocess.PIPE, stder...

python简单的加密解密

rsa 是非对称加密公钥加密,私钥解密pip install rsaimport rsa from binascii import b2a_hex, a2b_hex class&nb...

python 实现AES加密解密

什么是非对称加密1. A要向B发送信息,A和B都要产生一对用于加密和解密的公钥和私钥。• 2. A的私钥保密,A的公钥告诉B;B的私钥保密,B的公钥告诉A。• 3. A要给B发送信息时,A用B的公钥加密信息,因为A知道B的公钥。• 4. A...

python3.5.2版本不支持的语法格式

在使用geoip2的时候,运行被告知包中语法错误查看详情发现在python 3.5中不支持注释var类型如下语法 _buffer: Union[bytes, FileBuffer, "mma...

获取大文件的MD5值

(, )os.path.(, )logger.()         os.path.(, )logger.()  &...

python跟pip不是同一个版本的坑

python执行默认是Python2.7但是pip默认确实pip3的用pip install 安装包只会安装到python3环境里面指回pip vim /usr/local/bin/pip把 第一行的#!/usr/bin...