当前位置:首页 > python > 正文内容

雪花算法 Snowflake python 实现代码

root5个月前 (03-08)python437
import time
import logging

# 分配位置
WORKER_BITS = 5
DATACENTER_BITS = 5
SEQUENCE_BITS = 12

# 设定设备数量上限
WORKER_UPPER_LIMIT = -1 ^ (-1 << WORKER_BITS)
DATACENTER_UPPER_TIMIT = -1 ^ (-1 << DATACENTER_BITS)


# 组合是的位运算偏移量
WORKER_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_BITS + DATACENTER_BITS

SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)  # 掩码
EPOCH = 1577808001000  # 元时间戳 此处元设为 2020-01-01 00:00:01


class SnowFlake(object):

    def __init__(self, data_center_id, worker_id, sequence=0):
        """
        :param data_center_id: 数据中心编号
        :param worker_id: 机器编号
        :param sequence: 序号
        """
        if worker_id > WORKER_UPPER_LIMIT:
            raise ValueError("WORKER ID 高于上限")
        if worker_id < 0:
            raise ValueError("WORKER ID 低于下限")
        if data_center_id > DATACENTER_UPPER_TIMIT:
            raise ValueError("DATA CENTER ID 高于上限")
        if data_center_id < 0:
            raise ValueError("DATA CENTER ID 低于上限")
        self.worker_id = worker_id
        self.datacenter_id = data_center_id
        self.sequence = sequence

        self.last_timestamp = -1  # 最近一次生成编号的时间戳

    @staticmethod
    def _timestamp(n=1e3) -> int:
        """指定位数时间戳
        :param n:
        :return:
        """
        return int(time.time() * n)

    def _check(self, timestamp):
        """
        超限检查
        :param timestamp:
        :return:
        """
        self._time_back_off_check(timestamp)
        self._number_check(timestamp)

    def _number_check(self, timestamp):
        """
        数超限检查,检查当前时间生成的编号是否超过上限,超过上限则的等到下一个时间生成
        :param timestamp:
        :return:
        """
        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & SEQUENCE_MASK
            if self.sequence == 0:
                timestamp = self._wait_next_time(self.last_timestamp)
        else:
            self.sequence = 0

    def _time_back_off_check(self, timestamp):
        if timestamp < self.last_timestamp:
            logging.error('发现时钟回退,记录到最近一次的时间戳为 {}'.format(self.last_timestamp))
            raise Exception("时钟回拨异常")

    def task(self) -> int:
        """
        获取一个编号
        :return:
        """
        timestamp = self._timestamp()
        self._check(timestamp)
        self.last_timestamp = timestamp
        return self._generate(timestamp)

    def _generate(self, timestamp) -> int:
        """ 生成一个编号
        :param timestamp:
        :return:
        """
        number = ((timestamp - EPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | (
                    self.worker_id << WORKER_SHIFT) | self.sequence
        return number

    def _wait_next_time(self, last_timestamp):
        """等到下一次单位时间
        :param last_timestamp:
        :return:
        """
        timestamp = self._timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._timestamp()
        return timestamp


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=224

分享给朋友:

相关文章

python 实现AES加密解密

什么是非对称加密1. A要向B发送信息,A和B都要产生一对用于加密和解密的公钥和私钥。• 2. A的私钥保密,A的公钥告诉B;B的私钥保密,B的公钥告诉A。• 3. A要给B发送信息时,A用B的公钥加密信息,因为A知道B的公钥。• 4. A...

python 装饰器 之打印函数执行时间

在实际开发中 遇见很多需要排查函数执行时间定位性能瓶颈点用装饰器获取函数执行的时间还是比较方便的import inspect import time def timethis(func):  ...

flask的websocket的简单用例

flask的websocket的简单用例

后端代码flask Flask, render_template, request flask_socketio SocketIO, emit app (__name__,...

python 自定义好用logger模块

# -*- coding:utf-8 -*- import sys import logging.handlers DEFAULT_LOG_FMT = '...

python进行远程ssh连接的pexpect模块

from pexpect import pxssh s=pxssh.pxssh() s.login(host, user, password) s.sendline(cmd) s.p...

Python-获取图片的大小

安装Pillowpip install pillow本地图片import os from PIL import Image path = os.path.join(os.g...