From e0056906426de7980bf3d6e9f7d818e5050e8ffb Mon Sep 17 00:00:00 2001 From: Ben Date: Tue, 28 May 2024 11:27:09 +0200 Subject: [PATCH 1/2] Adds tests for concurrent queries. Finishes disposed iterators. --- datastore/fsds.nim | 4 ++ datastore/leveldb/leveldbds.nim | 1 + datastore/sql/sqliteds.nim | 1 + tests/datastore/querycommontests.nim | 100 +++++++++++++++++++++++++++ 4 files changed, 106 insertions(+) diff --git a/datastore/fsds.nim b/datastore/fsds.nim index 7599fcea..c9a32137 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -216,6 +216,10 @@ method query*( return success (key.some, data) iter.next = next + iter.dispose = proc(): Future[?!void] {.async.} = + iter.finished = true + return success() + return success iter method modifyGet*( diff --git a/datastore/leveldb/leveldbds.nim b/datastore/leveldb/leveldbds.nim index 30e30f67..6d6de469 100644 --- a/datastore/leveldb/leveldbds.nim +++ b/datastore/leveldb/leveldbds.nim @@ -116,6 +116,7 @@ method query*( proc dispose(): Future[?!void] {.async.} = dbIter.dispose() + iter.finished = true return success() iter.next = next diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 8f16f07d..a7ca095b 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -325,6 +325,7 @@ method query*( discard sqlite3_reset(s) discard sqlite3_clear_bindings(s) iter.next = nil + iter.finished = true return success() iter.next = next diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index c3ffd3b5..3c26fa52 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -44,6 +44,106 @@ template queryTests*(ds: Datastore, testLimitsAndOffsets = true, testSortOrder = (await iter.dispose()).tryGet + test "Concurrent queries": + let + q1 = Query.init(key1) + q2 = Query.init(key1) + + (await ds.put(key1, val1)).tryGet + (await ds.put(key2, val2)).tryGet + (await ds.put(key3, val3)).tryGet + + let + iter1 = (await ds.query(q1)).tryGet + iter2 = (await ds.query(q1)).tryGet + + let one1 = (await iter1.next()).tryGet + check not iter1.finished + + let one2 = (await iter2.next()).tryGet + check not iter2.finished + + let two1 = (await iter1.next()).tryGet + check not iter1.finished + let three1 = (await iter1.next()).tryGet + check not iter1.finished + + let two2 = (await iter2.next()).tryGet + check not iter2.finished + let three2 = (await iter2.next()).tryGet + check not iter2.finished + + let four1 = (await iter1.next()).tryGet + check iter1.finished + let four2 = (await iter2.next()).tryGet + check iter2.finished + + check: + one1[0].get == key1 + one1[1] == val1 + two1[0].get == key2 + two1[1] == val2 + three1[0].get == key3 + three1[1] == val3 + four1[0] == Key.none + + one2[0].get == key1 + one2[1] == val1 + two2[0].get == key2 + two2[1] == val2 + three2[0].get == key3 + three2[1] == val3 + four2[0] == Key.none + + (await iter1.dispose()).tryGet + (await iter2.dispose()).tryGet + + test "Concurrent queries - dispose": + let + q1 = Query.init(key1) + q2 = Query.init(key1) + + (await ds.put(key1, val1)).tryGet + (await ds.put(key2, val2)).tryGet + (await ds.put(key3, val3)).tryGet + + let + iter1 = (await ds.query(q1)).tryGet + iter2 = (await ds.query(q1)).tryGet + + let one1 = (await iter1.next()).tryGet + check not iter1.finished + + let one2 = (await iter2.next()).tryGet + check not iter2.finished + + (await iter1.dispose()).tryGet + check iter1.finished + + let two2 = (await iter2.next()).tryGet + check not iter2.finished + let three2 = (await iter2.next()).tryGet + check not iter2.finished + + let four2 = (await iter2.next()).tryGet + check iter2.finished + + check: + one1[0].get == key1 + one1[1] == val1 + + one2[0].get == key1 + one2[1] == val1 + two2[0].get == key2 + two2[1] == val2 + three2[0].get == key3 + three2[1] == val3 + four2[0] == Key.none + + (await iter1.dispose()).tryGet + (await iter2.dispose()).tryGet + + test "Key should query all keys without values": let q = Query.init(key1, value = false) From 660aa6df076827ad2b200a648927f21148a82db8 Mon Sep 17 00:00:00 2001 From: Ben Date: Mon, 19 Aug 2024 11:06:52 +0200 Subject: [PATCH 2/2] Disposed iters stop yielding data. --- datastore/fsds.nim | 3 +++ datastore/leveldb/leveldbds.nim | 2 +- datastore/sql/sqliteds.nim | 3 +-- tests/datastore/querycommontests.nim | 33 ++++++++++++++++++++++++++++ 4 files changed, 38 insertions(+), 3 deletions(-) diff --git a/datastore/fsds.nim b/datastore/fsds.nim index c9a32137..7e32a868 100644 --- a/datastore/fsds.nim +++ b/datastore/fsds.nim @@ -190,6 +190,9 @@ method query*( iter = QueryIter.new() proc next(): Future[?!QueryResponse] {.async.} = + if iter.finished: + return success (Key.none, EmptyBytes) + let path = walker() diff --git a/datastore/leveldb/leveldbds.nim b/datastore/leveldb/leveldbds.nim index 6d6de469..6d32261b 100644 --- a/datastore/leveldb/leveldbds.nim +++ b/datastore/leveldb/leveldbds.nim @@ -100,7 +100,7 @@ method query*( proc next(): Future[?!QueryResponse] {.async.} = if iter.finished: - return failure(newException(QueryEndedError, "Calling next on a finished query!")) + return success (Key.none, EmptyBytes) try: let (keyStr, valueStr) = dbIter.next() diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index a7ca095b..e1d2fcb2 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -269,7 +269,7 @@ method query*( proc next(): Future[?!QueryResponse] {.async.} = if iter.finished: - return failure(newException(QueryEndedError, "Calling next on a finished query!")) + return success (Key.none, EmptyBytes) let v = sqlite3_step(s) @@ -324,7 +324,6 @@ method query*( iter.dispose = proc(): Future[?!void] {.async.} = discard sqlite3_reset(s) discard sqlite3_clear_bindings(s) - iter.next = nil iter.finished = true return success() diff --git a/tests/datastore/querycommontests.nim b/tests/datastore/querycommontests.nim index 3c26fa52..87c66798 100644 --- a/tests/datastore/querycommontests.nim +++ b/tests/datastore/querycommontests.nim @@ -143,6 +143,39 @@ template queryTests*(ds: Datastore, testLimitsAndOffsets = true, testSortOrder = (await iter1.dispose()).tryGet (await iter2.dispose()).tryGet + test "Dispose should discontinue iteration": + let + q1 = Query.init(key1) + + (await ds.put(key1, val1)).tryGet + (await ds.put(key2, val2)).tryGet + (await ds.put(key3, val3)).tryGet + + let + iter = (await ds.query(q1)).tryGet + + let one = (await iter.next()).tryGet + check not iter.finished + + let two = (await iter.next()).tryGet + check not iter.finished + + (await iter.dispose()).tryGet + check iter.finished + + let three = (await iter.next()).tryGet + check iter.finished + + let four = (await iter.next()).tryGet + check iter.finished + + check: + one[0].get == key1 + one[1] == val1 + two[0].get == key2 + two[1] == val2 + three[0] == Key.none + four[0] == Key.none test "Key should query all keys without values": let