This repository has been archived by the owner on Apr 5, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrate_limit.py
89 lines (73 loc) · 2.59 KB
/
rate_limit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# -*- coding: utf-8 -*-
import time
import logging
from collections import namedtuple
RATE_NO_LIMIT = 0x00
RATE_GLOBAL = 0x01
RATE_NO_SILENCE = 0x02
RATE_INTERACTIVE = 0x04
RATE_CHAT = 0x08
RATE_URL = 0x10
RATE_EVENT = 0x20
RATE_FUN = 0x40
Bucket = namedtuple("BucketConfig", ["history", "period", "max_hist_len"])
buckets = {
# everything else
RATE_GLOBAL: Bucket(history=[], period=60, max_hist_len=10),
# bot writes with no visible stimuli
RATE_NO_SILENCE: Bucket(history=[], period=10, max_hist_len=5),
# interactive stuff like ping
RATE_INTERACTIVE: Bucket(history=[], period=30, max_hist_len=5),
# chitty-chat, master volume control
RATE_CHAT: Bucket(history=[], period=10, max_hist_len=5),
# reacting on URLs
RATE_URL: Bucket(history=[], period=10, max_hist_len=5),
# triggering events
RATE_EVENT: Bucket(history=[], period=60, max_hist_len=10),
# bot blames people, produces cake and entertains
RATE_FUN: Bucket(history=[], period=180, max_hist_len=5),
}
rate_limit_classes = buckets.keys()
def rate_limit(rate_class=RATE_GLOBAL):
"""
Remember N timestamps,
if N[0] newer than now()-T then do not output, do not append.
else pop(0); append()
:param rate_class: the type of message to verify
:return: False if blocked, True if allowed
"""
if rate_class not in rate_limit_classes:
return all(rate_limit(c) for c in rate_limit_classes if c & rate_class)
now = time.time()
bucket = buckets[rate_class]
logging.getLogger(__name__).debug(
"[ratelimit][bucket=%x][time=%s]%s",
rate_class, now, bucket.history
)
if len(bucket.history) >= bucket.max_hist_len and bucket.history[0] > (now - bucket.period):
# print("blocked")
return False
else:
if bucket.history and len(bucket.history) > bucket.max_hist_len:
bucket.history.pop(0)
bucket.history.append(now)
return True
def rate_limited(max_per_second):
"""
very simple flow control context manager
:param max_per_second: how many events per second may be executed - more are delayed
:return:
"""
min_interval = 1.0 / float(max_per_second)
def decorate(func):
lasttimecalled = [0.0]
def ratelimitedfunction(*args, **kargs):
elapsed = time.clock() - lasttimecalled[0]
lefttowait = min_interval - elapsed
if lefttowait > 0:
time.sleep(lefttowait)
ret = func(*args, **kargs)
lasttimecalled[0] = time.clock()
return ret
return ratelimitedfunction
return decorate