Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Adapters #18

Merged
merged 36 commits into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
6ba0cc7
Rework how adapters are stored
julik Feb 25, 2024
4b63934
Start extracting adapter tests
julik Feb 25, 2024
8aa5984
Add a memory adapter
julik Feb 25, 2024
a43958e
Allow multiple adapters to be instantiated
julik Feb 25, 2024
038f2be
Slowly moving on
julik Feb 25, 2024
48a22d7
We need some threading tests but this is not "it"
julik Feb 25, 2024
cf75a16
Add stubs for locking in memory adapter
julik Feb 25, 2024
4ae93dd
Extract lock
julik Feb 25, 2024
349d38a
Rename test
julik Feb 25, 2024
83568dc
Rename leaky bucket test
julik Feb 25, 2024
fe231c7
Getting along
julik Feb 25, 2024
6c0e529
Even NOW() is different
julik Feb 25, 2024
eccd557
Seems to work. Mostly.
julik Feb 25, 2024
1d67a32
And one more done
julik Feb 25, 2024
c84453f
Disallow block_for <= 0
julik Feb 26, 2024
bf36b41
Use memory adapter for cached throttle test
julik Feb 26, 2024
1e706d1
Script comment
julik Feb 26, 2024
c064c81
Tweak a little
julik Feb 26, 2024
17eeb81
Still getting there
julik Feb 26, 2024
6c2d7f3
That mostly works
julik Feb 26, 2024
da9daff
A bit more Volkswagening
julik Feb 26, 2024
c824e29
Make Pecorino.adapter pluggable
julik Feb 27, 2024
ff8ee92
Structure tests a bit better still
julik Feb 27, 2024
4356734
Add Redis in CI
julik Feb 28, 2024
cbae5f0
Reformat
julik Feb 28, 2024
145258a
Remove unneeded lint step
julik Feb 28, 2024
6087ba9
Just on push is sufficient
julik Feb 28, 2024
6e5c6e9
Document Redis is available now
julik Feb 28, 2024
e8298a7
Zap DatabaseAdapter
julik Feb 28, 2024
6d4fc1b
Remove database from block_test.rb
julik Mar 12, 2024
538842a
Allow negative values for Block.set!
julik Mar 12, 2024
5a4bb2b
Bump version and changelog
julik Mar 12, 2024
3898531
Add YARD comments to BaseAdapter
julik Mar 20, 2024
73f6be5
Better pruning assertions
julik Mar 20, 2024
9662a33
Document adapter= and adapter
julik Mar 20, 2024
047c9f0
Improve memory store cleanup
julik Mar 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 5 additions & 26 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,11 @@ name: CI

on:
- push
- pull_request

env:
BUNDLE_PATH: vendor/bundle

jobs:
# lint:
# name: Code Style
# runs-on: ubuntu-22.04
# if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository
# strategy:
# matrix:
# ruby:
# - '2.7'
# steps:
# - name: Checkout
# uses: actions/checkout@v4
# - name: Setup Ruby
# uses: ruby/setup-ruby@v1
# with:
# ruby-version: ${{ matrix.ruby }}
# bundler-cache: true
# - name: Rubocop Cache
# uses: actions/cache@v3
# with:
# path: ~/.cache/rubocop_cache
# key: ${{ runner.os }}-rubocop-${{ hashFiles('.rubocop.yml') }}
# restore-keys: |
# ${{ runner.os }}-rubocop-
# - name: Rubocop
# run: bundle exec rubocop
test:
name: Tests
runs-on: ubuntu-22.04
Expand All @@ -50,6 +24,11 @@ jobs:
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 5432:5432
redis:
image: redis
options: --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.7.0

- Allow `Pecorino.adapter` to be assigned, and add `adapter:` to all classes. This allows the adapter for Pecorino to be configured manually and overridden in an initializer.
- Add Redis-based adapter derived from Prorate
- Formalize and test the adapter API
- Add a memory-based adapter for single-process applications (and as a reference)

## 0.6.0

- Add `Pecorino::Block` for setting blocks directly. These are available both to `Throttle` with the same key and on their own. This can be used to set arbitrary blocks without having to configure a `Throttle` first.
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

Pecorino is a rate limiter based on the concept of leaky buckets, or more specifically - based on the [generic cell rate](https://brandur.org/rate-limiting) algorithm. It uses your DB as the storage backend for the throttles. It is compact, easy to install, and does not require additional infrastructure. The approach used by Pecorino has been previously used by [prorate](https://github.com/WeTransfer/prorate) with Redis, and that approach has proven itself.

Pecorino is designed to integrate seamlessly into any Rails application using a PostgreSQL or SQLite database (at the moment there is no MySQL support, we would be delighted if you could add it).
Pecorino is designed to integrate seamlessly into any Rails application, and will use either:

* A memory store (good enough if you have just 1 process)
* A PostgreSQL or SQLite database (at the moment there is no MySQL support, we would be delighted if you could add it)
* A Redis instance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: might be a good idea to explain how to redefine adapter - in case of redis, as example, adapter will not be inherited from ActiveRecord and requires a manual configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of them inherit from ActiveRecord anymore, some just use the ActiveRecord classes as means of passing connection configuration (and to piggyback on escaping)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of them inherit from ActiveRecord anymore

Why are you saying that? there is a default_adapter_from_main_database method still defined.

https://github.com/cheddar-me/pecorino/pull/18/files#diff-3ae95d36dd505f2d90c28bab102aceb7c117caee072c2a5910685a35fb1f777bR49

But it would be great to add a section about how to configure adapter for pecorino and which settings are being configured by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is that method indeed (to let users have their adapter picked automatically), but nothing inherits anything there?..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion, indeed wrong use of the word "inherits".

If you would like to know more about the leaky bucket algorithm: [this article](http://live.julik.nl/2022/08/the-unreasonable-effectiveness-of-leaky-buckets) or the [Wikipedia article](https://en.wikipedia.org/wiki/Leaky_bucket) are both good starting points. [This Wikipedia article](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm) describes the generic cell rate algorithm in more detail as well.

Expand Down
57 changes: 31 additions & 26 deletions lib/pecorino.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
require_relative "pecorino/railtie" if defined?(Rails::Railtie)

module Pecorino
autoload :Postgres, "pecorino/postgres"
autoload :Sqlite, "pecorino/sqlite"
autoload :LeakyBucket, "pecorino/leaky_bucket"
autoload :Block, "pecorino/block"
autoload :Throttle, "pecorino/throttle"
autoload :CachedThrottle, "pecorino/cached_throttle"

module Adapters
autoload :MemoryAdapter, "pecorino/adapters/memory_adapter"
autoload :PostgresAdapter, "pecorino/adapters/postgres_adapter"
autoload :SqliteAdapter, "pecorino/adapters/sqlite_adapter"
autoload :RedisAdapter, "pecorino/adapters/redis_adapter"
end

# Deletes stale leaky buckets and blocks which have expired. Run this method regularly to
# avoid accumulating too many unused rows in your tables.
#
# @return void
def self.prune!
# Delete all the old blocks here (if we are under a heavy swarm of requests which are all
# blocked it is probably better to avoid the big delete)
ActiveRecord::Base.connection.execute("DELETE FROM pecorino_blocks WHERE blocked_until < NOW()")

# Prune buckets which are no longer used. No "uncached" needed here since we are using "execute"
ActiveRecord::Base.connection.execute("DELETE FROM pecorino_leaky_buckets WHERE may_be_deleted_after < NOW()")
adapter.prune
end

# Creates the tables and indexes needed for Pecorino. Call this from your migrations like so:
Expand All @@ -38,36 +38,41 @@ def self.prune!
# @param active_record_schema[ActiveRecord::SchemaMigration] the migration through which we will create the tables
# @return void
def self.create_tables(active_record_schema)
active_record_schema.create_table :pecorino_leaky_buckets, id: :uuid do |t|
t.string :key, null: false
t.float :level, null: false
t.datetime :last_touched_at, null: false
t.datetime :may_be_deleted_after, null: false
end
active_record_schema.add_index :pecorino_leaky_buckets, [:key], unique: true
active_record_schema.add_index :pecorino_leaky_buckets, [:may_be_deleted_after]
adapter.create_tables(active_record_schema)
end

active_record_schema.create_table :pecorino_blocks, id: :uuid do |t|
t.string :key, null: false
t.datetime :blocked_until, null: false
end
active_record_schema.add_index :pecorino_blocks, [:key], unique: true
active_record_schema.add_index :pecorino_blocks, [:blocked_until]
# Allows assignment of an adapter for storing throttles. Normally this would be a subclass of `Pecorino::Adapters::BaseAdapter`, but
# you can assign anything you like. Set this in an initializer. By default Pecorino will use the adapter configured from your main
# database, but you can also create a separate database for it - or use Redis or memory storage.
#
# @param adapter[Pecorino::Adapters::BaseAdapter]
# @return [Pecorino::Adapters::BaseAdapter]
def self.adapter=(adapter)
@adapter = adapter
julik marked this conversation as resolved.
Show resolved Hide resolved
end

# Returns the currently configured adapter, or the default adapter from the main database
#
# @return [Pecorino::Adapters::BaseAdapter]
def self.adapter
@adapter || default_adapter_from_main_database
end

# Returns the database implementation for setting the values atomically. Since the implementation
# differs per database, this method will return a different adapter depending on which database is
# being used
def self.adapter
#
# @param adapter[Pecorino::Adapters::BaseAdapter]
def self.default_adapter_from_main_database
model_class = ActiveRecord::Base
adapter_name = model_class.connection.adapter_name
case adapter_name
when /postgres/i
Pecorino::Postgres.new(model_class)
Pecorino::Adapters::PostgresAdapter.new(model_class)
when /sqlite/i
Pecorino::Sqlite.new(model_class)
Pecorino::Adapters::SqliteAdapter.new(model_class)
else
raise "Pecorino does not support #{adapter_name} just yet"
raise "Pecorino does not support the #{adapter_name} database just yet"
end
end
end
66 changes: 66 additions & 0 deletions lib/pecorino/adapters/base_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# frozen_string_literal: true

# An adapter allows Pecorino throttles, leaky buckets and other
# resources to interfact to a data storage backend - a database, usually.
class Pecorino::Adapters::BaseAdapter
# Returns the state of a leaky bucket. The state should be a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
#
# @param key[String] the key of the leaky bucket
# @param capacity[Float] the capacity of the leaky bucket to limit to
# @param leak_rate[Float] how many tokens leak out of the bucket per second
# @return [Array]
def state(key:, capacity:, leak_rate:)
[0, false]
end

# Adds tokens to the leaky bucket. The return value is a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
#
# @param key[String] the key of the leaky bucket
# @param capacity[Float] the capacity of the leaky bucket to limit to
# @param leak_rate[Float] how many tokens leak out of the bucket per second
# @param n_tokens[Float] how many tokens to add
# @return [Array]
def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
[0, false]
end

# Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will
# be added. If there isn't - the fillup will be rejected. The return value is a triplet of
# the current level (Float), whether the bucket is now at capacity (Boolean)
# and whether the fillup was accepted (Boolean)
#
# @param key[String] the key of the leaky bucket
# @param capacity[Float] the capacity of the leaky bucket to limit to
# @param leak_rate[Float] how many tokens leak out of the bucket per second
# @param n_tokens[Float] how many tokens to add
# @return [Array]
def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
[0, false, false]
end

# Sets a timed block for the given key - this is used when a throttle fires. The return value
# is not defined - the call should always succeed.
# @param key[String] the key of the block
# @param block_for[#to_f, Active Support Duration] the duration of the block, in seconds
def set_block(key:, block_for:)
end

# Returns the time until which a block for a given key is in effect. If there is no block in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: it's cool that you're writting these comments, but would it be better to have yard definitions here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not both? 😆 good one

# effect, the method should return `nil`. The return value is either a `Time` or `nil`
# @param key[String] the key of the block
def blocked_until(key:)
end

# Deletes leaky buckets which have an expiry value prior to now and throttle blocks which have
# now lapsed
# @return [void]
def prune
julik marked this conversation as resolved.
Show resolved Hide resolved
end

# Creates the database tables for Pecorino to operate, or initializes other
# schema-like resources the adapter needs to operate
def create_tables(active_record_schema)
end
end
147 changes: 147 additions & 0 deletions lib/pecorino/adapters/memory_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# frozen_string_literal: true

# A memory store for leaky buckets and blocks
class Pecorino::Adapters::MemoryAdapter
class KeyedLock
def initialize
@locked_keys = Set.new
@lock_mutex = Mutex.new
end

def lock(key)
loop do
@lock_mutex.synchronize do
next if @locked_keys.include?(key)
@locked_keys << key
return
end
end
end

def unlock(key)
@lock_mutex.synchronize do
@locked_keys.delete(key)
end
end

def with(key)
lock(key)
yield
ensure
unlock(key)
end
end

def initialize
@buckets = {}
@blocks = {}
@lock = KeyedLock.new
end

# Returns the state of a leaky bucket. The state should be a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
def state(key:, capacity:, leak_rate:)
@lock.lock(key)
level, ts = @buckets[key]
@lock.unlock(key)

return [0, false] unless level

dt = get_mono_time - ts
level_after_leak = [0, level - (leak_rate * dt)].max
[level_after_leak.to_f, (level_after_leak - capacity) >= 0]
end

# Adds tokens to the leaky bucket. The return value is a tuple of two
# values: the current level (Float) and whether the bucket is now at capacity (Boolean)
def add_tokens(key:, capacity:, leak_rate:, n_tokens:)
add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = false)
end

# Adds tokens to the leaky bucket conditionally. If there is capacity, the tokens will
# be added. If there isn't - the fillup will be rejected. The return value is a triplet of
# the current level (Float), whether the bucket is now at capacity (Boolean)
# and whether the fillup was accepted (Boolean)
def add_tokens_conditionally(key:, capacity:, leak_rate:, n_tokens:)
add_tokens_with_lock(key, capacity, leak_rate, n_tokens, _conditionally = true)
end

# Sets a timed block for the given key - this is used when a throttle fires. The return value
# is not defined - the call should always succeed.
def set_block(key:, block_for:)
raise ArgumentError, "block_for must be positive" unless block_for > 0
@lock.lock(key)
@blocks[key] = get_mono_time + block_for.to_f
Time.now + block_for.to_f
ensure
@lock.unlock(key)
end

# Returns the time until which a block for a given key is in effect. If there is no block in
# effect, the method should return `nil`. The return value is either a `Time` or `nil`
def blocked_until(key:)
blocked_until_monotonic = @blocks[key]
return unless blocked_until_monotonic

now_monotonic = get_mono_time
return unless blocked_until_monotonic > now_monotonic

Time.now + (blocked_until_monotonic - now_monotonic)
end

# Deletes leaky buckets which have an expiry value prior to now and throttle blocks which have
# now lapsed
def prune
now_monotonic = get_mono_time

@blocks.keys.each do |key|
@lock.with(key) do
@blocks.delete(key) if @blocks[key] && @blocks[key] < now_monotonic
end
end

@buckets.keys.each do |key|
@lock.with(key) do
_level, expire_at_monotonic = @buckets[key]
@buckets.delete(key) if expire_at_monotonic && expire_at_monotonic < now_monotonic
end
end
end

# No-op
def create_tables(active_record_schema)
end

private

def add_tokens_with_lock(key, capacity, leak_rate, n_tokens, conditionally)
@lock.lock(key)
now = get_mono_time
level, ts, _ = @buckets[key] || [0.0, now]

dt = now - ts
level_after_leak = clamp(0, level - (leak_rate * dt), capacity)
level_after_fillup = level_after_leak + n_tokens
if level_after_fillup > capacity && conditionally
return [level_after_leak, level_after_leak >= capacity, _did_accept = false]
end

clamped_level_after_fillup = clamp(0, level_after_fillup, capacity)
expire_after = now + (level_after_fillup / leak_rate)
@buckets[key] = [clamped_level_after_fillup, now, expire_after]

[clamped_level_after_fillup, clamped_level_after_fillup == capacity, _did_accept = true]
ensure
@lock.unlock(key)
end

def get_mono_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def clamp(min, value, max)
return min if value < min
return max if value > max
value
end
end
Loading