当前位置:首页 > python > 正文内容

雪花算法 Snowflake python 实现代码

root3年前 (2022-03-08)python1996
import time
import logging

# 分配位置
WORKER_BITS = 5
DATACENTER_BITS = 5
SEQUENCE_BITS = 12

# 设定设备数量上限
WORKER_UPPER_LIMIT = -1 ^ (-1 << WORKER_BITS)
DATACENTER_UPPER_TIMIT = -1 ^ (-1 << DATACENTER_BITS)


# 组合是的位运算偏移量
WORKER_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_BITS + DATACENTER_BITS

SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)  # 掩码
EPOCH = 1577808001000  # 元时间戳 此处元设为 2020-01-01 00:00:01


class SnowFlake(object):

    def __init__(self, data_center_id, worker_id, sequence=0):
        """
        :param data_center_id: 数据中心编号
        :param worker_id: 机器编号
        :param sequence: 序号
        """
        if worker_id > WORKER_UPPER_LIMIT:
            raise ValueError("WORKER ID 高于上限")
        if worker_id < 0:
            raise ValueError("WORKER ID 低于下限")
        if data_center_id > DATACENTER_UPPER_TIMIT:
            raise ValueError("DATA CENTER ID 高于上限")
        if data_center_id < 0:
            raise ValueError("DATA CENTER ID 低于上限")
        self.worker_id = worker_id
        self.datacenter_id = data_center_id
        self.sequence = sequence

        self.last_timestamp = -1  # 最近一次生成编号的时间戳

    @staticmethod
    def _timestamp(n=1e3) -> int:
        """指定位数时间戳
        :param n:
        :return:
        """
        return int(time.time() * n)

    def _check(self, timestamp):
        """
        超限检查
        :param timestamp:
        :return:
        """
        self._time_back_off_check(timestamp)
        self._number_check(timestamp)

    def _number_check(self, timestamp):
        """
        数超限检查,检查当前时间生成的编号是否超过上限,超过上限则的等到下一个时间生成
        :param timestamp:
        :return:
        """
        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & SEQUENCE_MASK
            if self.sequence == 0:
                timestamp = self._wait_next_time(self.last_timestamp)
        else:
            self.sequence = 0

    def _time_back_off_check(self, timestamp):
        if timestamp < self.last_timestamp:
            logging.error('发现时钟回退,记录到最近一次的时间戳为 {}'.format(self.last_timestamp))
            raise Exception("时钟回拨异常")

    def task(self) -> int:
        """
        获取一个编号
        :return:
        """
        timestamp = self._timestamp()
        self._check(timestamp)
        self.last_timestamp = timestamp
        return self._generate(timestamp)

    def _generate(self, timestamp) -> int:
        """ 生成一个编号
        :param timestamp:
        :return:
        """
        number = ((timestamp - EPOCH) << TIMESTAMP_LEFT_SHIFT) | (self.datacenter_id << DATACENTER_ID_SHIFT) | (
                    self.worker_id << WORKER_SHIFT) | self.sequence
        return number

    def _wait_next_time(self, last_timestamp):
        """等到下一次单位时间
        :param last_timestamp:
        :return:
        """
        timestamp = self._timestamp()
        while timestamp <= last_timestamp:
            timestamp = self._timestamp()
        return timestamp


扫描二维码推送至手机访问。

版权声明:本文由一叶知秋发布,如需转载请注明出处。

本文链接:https://zhiqiu.top/?id=224

分享给朋友:

相关文章

python 自定义好用logger模块

# -*- coding:utf-8 -*- import sys import logging.handlers DEFAULT_LOG_FMT = '...

python logging 模块对多进程的支持

深度解决方案logging 模块 是支持多线程的但是多进程的会出现问题,因为对文件读写会出现资源的争抢如何解决对多进程的出现的问题concurrent-log-handler包 解决问题该模块同样也为python的标准日志记录软件提供了额外...

python 装饰器 之打印函数执行时间

在实际开发中 遇见很多需要排查函数执行时间定位性能瓶颈点用装饰器获取函数执行的时间还是比较方便的import inspect import time def timethis(func):  ...

Python的多线程并发限制

maxConnections connection_lock (maxConnections)在开启线程前执行connection_lock.acquire()线程执行结束执行connection_lock.releas...

Python-获取图片的大小

安装Pillowpip install pillow本地图片import os from PIL import Image path = os.path.join(os.g...

Python  os.system 和subprocess.popen 并发执行linux的性能对比

Python os.system 和subprocess.popen 并发执行linux的性能对比

os subprocess multiprocessing.dummy Pool ThreadPool command_list [] ()os.() start_time =&...