Skip to content

Commit

Permalink
Connection pool (#55)
Browse files Browse the repository at this point in the history
* Add connection pool
* Add spec about connection pool
* Go to v0.5
  • Loading branch information
anykeyh authored Dec 24, 2018
1 parent b916c78 commit 333db75
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/poc
/generated/
/test
/docs
/coverage/

/bin/crystal-coverage*
Expand Down
28 changes: 27 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
# master/HEAD (v0.5)

To be continued 😁
# v0.5: Merry christmas 🎄

## Features

### Connection pool

Clear wasn't fiber-proof since it lacks of connection pool system. It's now fixed, the connection pooling is done
completely transparently without any boilerplate on your application side.

Each fiber may require a specific connection; then the connection is binded to the fiber. In the case of `transaction`
and `with_savepoint`, the connection is kept until the end of the block happens.
On the case of normal execution or cursors, we store the connection until the execution is done.

The connection pool is using Channel so in case of pool shortage, the fiber requesting the connection is put in
waiting state.

This is a huge step forward:
- Clear can be used in framework with `spawn`-based server and other event machine system.
- I'll work on performance improvement or other subtilities in some specific cases, where multiple requests can be
parallelized over different connections.

## Bug fixes

- Fix #53
- Update ameba to latest version
- Large refactoring on relations
- Many bugfixes

- Fix #53
- Update dependencies to newer version
Expand Down
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: clear
version: 0.4
version: 0.5

authors:
- Yacine Petitprez <[email protected]>
Expand Down
4 changes: 2 additions & 2 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def initdb
system("echo \"CREATE DATABASE clear_secondary_spec;\" | psql -U postgres")
system("echo \"CREATE TABLE models_post_stats (id serial PRIMARY KEY, post_id INTEGER);\" | psql -U postgres clear_secondary_spec")

Clear::SQL.init("postgres://postgres@localhost/clear_spec")
Clear::SQL.init("secondary", "postgres://postgres@localhost/clear_secondary_spec")
Clear::SQL.init("postgres://postgres@localhost/clear_spec", connection_pool_size: 5)
Clear::SQL.init("secondary", "postgres://postgres@localhost/clear_secondary_spec", connection_pool_size: 5)

Clear.logger.level = {% if flag?(:quiet) %} ::Logger::ERROR {% else %} Clear.logger.level = ::Logger::DEBUG {% end %}
end
Expand Down
51 changes: 51 additions & 0 deletions spec/sql/connection_pool_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require "../spec_helper"

module ConnectionPoolSpec
extend self

@@count = 0_i64

def self.reinit
reinit_migration_manager
end

describe "Clear::SQL" do
describe "ConnectionPool" do
it "can handle multiple fibers" do

begin
Clear::SQL.execute("CREATE TABLE tests (id serial PRIMARY KEY)")

init = true
spawn do
Clear::SQL.transaction do
Clear::SQL.insert("tests", {id: 1}).execute
sleep 0.2 #< The transaction is not yet commited
end
end

@@count = 0

spawn do
# Not inside the transaction so count must be zero since the transaction is not finished:
sleep 0.1
@@count = Clear::SQL.select.from("tests").count
end

sleep 0.3 # Let the 2 spawn finish...

@@count.should eq 0 #< If one, the connection pool got wrong with the fiber.

# Now the transaction is over, count should be 1
count = Clear::SQL.select.from("tests").count
count.should eq 1
ensure
Clear::SQL.execute("DROP TABLE tests;") unless init
end

end
end
end


end
36 changes: 36 additions & 0 deletions src/clear/sql/connection_pool.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class Clear::SQL::ConnectionPool
@@connections = {} of String => Channel(DB::Database)

@@fiber_connections = {} of {String, Fiber} => { DB::Database, Int32 }

def self.init(uri, name, pool_size)
raise "Pool size must be superior to 0" unless pool_size > 0
channel = @@connections[name] = Channel(DB::Database).new(capacity: pool_size)
pool_size.times{ channel.send DB.open(uri) }
end

# Retrieve a connection from the connection pool, or wait for it.
# If the current Fiber already has a connection, the connection is returned;
# this strategy provides easy usage of multiple statement connection (like BEGIN/ROLLBACK features).
def self.with_connection(target : String, &block)
fiber_target = {target, Fiber.current}

channel = @@connections.fetch(target){ raise Clear::ErrorMessages.uninitialized_db_connection(target) }
db, call_count = @@fiber_connections.fetch(fiber_target){ { channel.receive, 0} }

begin
@@fiber_connections[fiber_target] = {db, call_count+1}
yield(db)
ensure
db, call_count = @@fiber_connections[fiber_target]

if call_count == 1
@@fiber_connections.delete(fiber_target)
channel.send db
else
@@fiber_connections[fiber_target] = {db, call_count - 1}
end
end
end

end
7 changes: 5 additions & 2 deletions src/clear/sql/insert_query.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ class Clear::SQL::InsertQuery
Clear::SQL.log_query to_sql do
h = {} of String => ::Clear::SQL::Any

Clear::SQL.connection(connection_name).query(to_sql) do |rs|
fetch_result_set(h, rs) { |x| yield(x) }
Clear::SQL::ConnectionPool.with_connection(connection_name) do |cnx|
cnx.query(to_sql) do |rs|
fetch_result_set(h, rs) { |x| yield(x) }
end
end

end
end

Expand Down
9 changes: 4 additions & 5 deletions src/clear/sql/query/fetch.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ module Clear::SQL::Query::Fetch
def fetch_with_cursor(count = 1_000, &block : Hash(String, ::Clear::SQL::Any) -> Void)
trigger_before_query

Clear::SQL.transaction do
cnx = Clear::SQL.connection(connection_name)
Clear::SQL.transaction do |cnx|
cursor_name = "__cursor_#{Time.now.to_unix ^ (rand * 0xfffffff).to_i}__"

cursor_declaration = "DECLARE #{cursor_name} CURSOR FOR #{to_sql}"
Expand Down Expand Up @@ -57,7 +56,7 @@ module Clear::SQL::Query::Fetch
sql = to_sql

Clear::SQL.log_query sql do
Clear::SQL.connection(connection_name).scalar(sql).as(T)
Clear::SQL::ConnectionPool.with_connection(connection_name, &.scalar(sql)).as(T)
end
end

Expand All @@ -74,7 +73,7 @@ module Clear::SQL::Query::Fetch

sql = self.to_sql

rs = Clear::SQL.log_query(sql) { Clear::SQL.connection(connection_name).query(sql) }
rs = Clear::SQL.log_query(sql) { Clear::SQL::ConnectionPool.with_connection(connection_name, &.query(sql)) }

o = [] of Hash(String, ::Clear::SQL::Any)
fetch_result_set(h, rs) { |x| o << x.dup }
Expand Down Expand Up @@ -109,7 +108,7 @@ module Clear::SQL::Query::Fetch

sql = self.to_sql

rs = Clear::SQL.log_query(sql) { Clear::SQL.connection(connection_name).query(sql) }
rs = Clear::SQL.log_query(sql) { Clear::SQL::ConnectionPool.with_connection(connection_name, &.query(sql)) }

if fetch_all
o = [] of Hash(String, ::Clear::SQL::Any)
Expand Down
68 changes: 36 additions & 32 deletions src/clear/sql/sql.cr
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ module Clear
include Clear::SQL::Logger
extend self

@@connections = {} of String => DB::Database

def self.connection(connection="default") : DB::Database
@@connections[connection]? || raise Clear::ErrorMessages.uninitialized_db_connection(connection)
end

alias Symbolic = String | Symbol
alias Selectable = Symbolic | Clear::SQL::SelectBuilder
Expand All @@ -74,18 +69,22 @@ module Clear
Clear::Expression::UnsafeSql.new(x)
end

def init(url : String)
@@connections["default"] = DB.open(url)
def init(url : String, connection_pool_size = 5)
Clear::SQL::ConnectionPool.init(url, "default", connection_pool_size)
end

def init(name : String, url : String)
@@connections[name] = DB.open(url)
def init(name : String, url : String, connection_pool_size = 5)
Clear::SQL::ConnectionPool.init(url, name, connection_pool_size)
#@@connections[name] = DB.open(url)
end

def init(connections : Hash(Symbolic, String))
def init(connections : Hash(Symbolic, String), connection_pool_size = 5)
connections.each do |name, url|
@@connections[name.to_s] = DB.open(url)
Clear::SQL::ConnectionPool.init(url, name, connection_pool_size)
end
# connections.each do |name, url|
# @@connections[name.to_s] = DB.open(url)
# end
end

@@in_transaction : Bool = false
Expand All @@ -108,23 +107,26 @@ module Clear
# ```
# see #with_savepoint to use a stackable version using savepoints.
#
def transaction(&block)
if @@in_transaction
yield # In case we already are in transaction, we just ignore
else
@@in_transaction = true
execute("BEGIN")
begin
yield
execute("COMMIT")
rescue e
is_rollback_error = e.is_a?(RollbackError) || e.is_a?(CancelTransactionError)
execute("ROLLBACK --" + (is_rollback_error ? "normal" : "program error")) rescue nil
raise e unless is_rollback_error
ensure
@@in_transaction = false
def transaction(connection = "default", &block)
Clear::SQL::ConnectionPool.with_connection(connection) do |cnx|
if @@in_transaction
yield(cnx) # In case we already are in transaction, we just ignore
else
@@in_transaction = true
execute("BEGIN")
begin
yield(cnx)
execute("COMMIT")
rescue e
is_rollback_error = e.is_a?(RollbackError) || e.is_a?(CancelTransactionError)
execute("ROLLBACK --" + (is_rollback_error ? "normal" : "program error")) rescue nil
raise e unless is_rollback_error
ensure
@@in_transaction = false
end
end
end

end

# Create a transaction, but this one is stackable
Expand All @@ -139,15 +141,15 @@ module Clear
# end
# end
# ```
def with_savepoint(&block)
def with_savepoint(connection_name = "default", &block)
transaction do
sp_name = "sp_#{@@savepoint_uid += 1}"
begin
execute("SAVEPOINT #{sp_name}")
execute(connection_name, "SAVEPOINT #{sp_name}")
yield
execute("RELEASE SAVEPOINT #{sp_name}")
execute(connection_name, "RELEASE SAVEPOINT #{sp_name}")
rescue e : RollbackError
execute("ROLLBACK TO SAVEPOINT #{sp_name}")
execute(connection_name, "ROLLBACK TO SAVEPOINT #{sp_name}")
end
end
end
Expand All @@ -163,15 +165,17 @@ module Clear
# Clear::SQL.execute("SELECT 1 FROM users")
#
def execute(sql)
log_query(sql) { Clear::SQL.connection("default").exec(sql) }
log_query(sql) { Clear::SQL::ConnectionPool.with_connection("default", &.exec(sql)) }
end

# Execute a SQL statement on a specific connection.
#
# Usage:
# Clear::SQL.execute("seconddatabase", "SELECT 1 FROM users")
def execute(connection_name : String, sql)
log_query(sql) { Clear::SQL.connection(connection_name).exec(sql) }
log_query(sql) do
Clear::SQL::ConnectionPool.with_connection(connection_name, &.exec(sql))
end
end

# :nodoc:
Expand Down

0 comments on commit 333db75

Please sign in to comment.