Skip to content

Commit 3dbdf8c

Browse files
authored
Fix performance of history trimming (#3595)
* Fix performance of history trimming * Align version * CONCURRENTLY cannot run inside a transaction block * Fix missing coverage
1 parent 8272bc1 commit 3dbdf8c

File tree

8 files changed

+129
-19
lines changed

8 files changed

+129
-19
lines changed

kinto/core/storage/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,28 @@ def collection_timestamp(self, collection_id, parent_id):
385385
warnings.warn(message, DeprecationWarning)
386386
return self.resource_timestamp(resource_name=collection_id, parent_id=parent_id)
387387

388+
def trim_objects(
389+
self,
390+
resource_name: str,
391+
parent_id: str,
392+
filters: list,
393+
max_objects: int,
394+
id_field: str = DEFAULT_ID_FIELD,
395+
modified_field: str = DEFAULT_MODIFIED_FIELD,
396+
) -> int:
397+
"""
398+
Trim the last N objects in the specified resource matching the filters.
399+
400+
:param str resource_name: the resource name.
401+
:param str parent_id: the resource parent.
402+
:param list filters: list of :class:`kinto.core.storage.Filter` to filter objects.
403+
:param int max_objects: maximum number of objects to keep.
404+
:param str id_field: the id field name.
405+
:param str modified_field: the modified field name.
406+
:returns: the number of deleted objects.
407+
:rtype: int"""
408+
raise NotImplementedError
409+
388410

389411
def heartbeat(backend):
390412
def ping(request):

kinto/core/storage/memory.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
DEFAULT_ID_FIELD,
1111
DEFAULT_MODIFIED_FIELD,
1212
MISSING,
13+
Sort,
1314
StorageBase,
1415
exceptions,
1516
)
@@ -438,6 +439,36 @@ def delete_all(
438439
]
439440
return deleted
440441

442+
@synchronized
443+
def trim_objects(
444+
self,
445+
resource_name: str,
446+
parent_id: str,
447+
filters: list,
448+
max_objects: int,
449+
id_field: str = DEFAULT_ID_FIELD,
450+
modified_field: str = DEFAULT_MODIFIED_FIELD,
451+
) -> int:
452+
objects = _get_objects_by_parent_id(self._store, parent_id, resource_name, with_meta=True)
453+
objects, _ = self.extract_object_set(
454+
objects=objects,
455+
filters=filters,
456+
sorting=[Sort(modified_field, -1)],
457+
id_field=id_field,
458+
deleted_field=DEFAULT_DELETED_FIELD,
459+
)
460+
461+
to_delete = objects[max_objects:]
462+
for r in to_delete:
463+
self.delete(
464+
resource_name,
465+
parent_id,
466+
r[id_field],
467+
id_field=id_field,
468+
modified_field=modified_field,
469+
)
470+
return len(to_delete)
471+
441472

442473
def extract_object_set(
443474
objects,

kinto/core/storage/postgresql/__init__.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class Storage(StorageBase, MigratorMixin):
7979

8080
# MigratorMixin attributes.
8181
name = "storage"
82-
schema_version = 23
82+
schema_version = 24
8383
schema_file = os.path.join(HERE, "schema.sql")
8484
migrations_directory = os.path.join(HERE, "migrations")
8585

@@ -753,6 +753,45 @@ def count_all(
753753
)
754754
return rows[0].total_count
755755

756+
def trim_objects(
757+
self,
758+
resource_name: str,
759+
parent_id: str,
760+
filters: list,
761+
max_objects: int,
762+
id_field: str = DEFAULT_ID_FIELD,
763+
modified_field: str = DEFAULT_MODIFIED_FIELD,
764+
) -> int:
765+
query = """
766+
WITH to_delete AS (
767+
SELECT {id_field}
768+
FROM objects
769+
WHERE parent_id = :parent_id
770+
AND resource_name = :resource_name
771+
{conditions_filter}
772+
ORDER BY {modified_field} DESC
773+
OFFSET :max_objects
774+
)
775+
DELETE FROM objects o
776+
USING to_delete d
777+
WHERE o.id = d.id
778+
RETURNING 1;
779+
"""
780+
781+
placeholders = dict(
782+
parent_id=parent_id, resource_name=resource_name, max_objects=max_objects
783+
)
784+
safe_sql, holders = self._format_conditions(filters, id_field, modified_field)
785+
placeholders.update(**holders)
786+
787+
safeholders = dict(id_field=id_field, modified_field=modified_field)
788+
safeholders["conditions_filter"] = f"AND {safe_sql}"
789+
790+
with self.client.connect() as conn:
791+
result = conn.execute(sa.text(query.format_map(safeholders)), placeholders)
792+
# Using RETURNING so rowcount reflects the number deleted
793+
return result.rowcount
794+
756795
def _get_rows(
757796
self,
758797
query,
@@ -1040,7 +1079,7 @@ def _format_sorting(self, sorting, id_field, modified_field):
10401079
if sort.field == id_field:
10411080
sql_field = "id"
10421081
elif sort.field == modified_field:
1043-
sql_field = "last_modified"
1082+
sql_field = "objects.last_modified"
10441083
else:
10451084
# Subfields: ``person.name`` becomes ``data->person->name``
10461085
subfields = sort.field.split(".")
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
CREATE INDEX idx_objects_history_userid_and_resourcename
2+
ON objects ((data->'user_id'), (data->'resource_name'))
3+
WHERE resource_name = 'history';
4+
5+
-- Bump storage schema version.
6+
INSERT INTO metadata (name, value) VALUES ('storage_schema_version', '24');

kinto/core/storage/postgresql/schema.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ CREATE INDEX IF NOT EXISTS idx_objects_last_modified_epoch
4949
ON objects(as_epoch(last_modified));
5050
CREATE INDEX IF NOT EXISTS idx_objects_resource_name_parent_id_deleted
5151
ON objects(resource_name, parent_id, deleted);
52+
-- Index for history plugin trimming.
53+
CREATE INDEX IF NOT EXISTS idx_objects_history_userid_and_resourcename
54+
ON objects ((data->'user_id'), (data->'resource_name'))
55+
WHERE resource_name = 'history';
5256

5357
CREATE TABLE IF NOT EXISTS timestamps (
5458
parent_id TEXT NOT NULL COLLATE "C",
@@ -132,4 +136,4 @@ INSERT INTO metadata (name, value) VALUES ('created_at', NOW()::TEXT);
132136

133137
-- Set storage schema version.
134138
-- Should match ``kinto.core.storage.postgresql.PostgreSQL.schema_version``
135-
INSERT INTO metadata (name, value) VALUES ('storage_schema_version', '23');
139+
INSERT INTO metadata (name, value) VALUES ('storage_schema_version', '24');

kinto/core/storage/testing.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1765,6 +1765,22 @@ def test_update_bytes_raises(self):
17651765
)
17661766

17671767

1768+
class TrimObjectsTest:
1769+
def test_trims_n_of_same_resource(self):
1770+
for i in range(20):
1771+
self.create_object({"num": i, "group": "a" if i % 2 == 0 else "b"})
1772+
1773+
num_removed = self.storage.trim_objects(
1774+
resource_name=self.storage_kw["resource_name"],
1775+
parent_id=self.storage_kw["parent_id"],
1776+
filters=[Filter("group", "a", utils.COMPARISON.EQ)],
1777+
max_objects=3,
1778+
)
1779+
self.assertEqual(num_removed, 20 / 2 - 3) # it kept all 'b' and the 3 latest of 'a'
1780+
count = self.storage.count_all(**self.storage_kw)
1781+
self.assertEqual(count, 20 - num_removed)
1782+
1783+
17681784
class DeprecatedCoreNotionsTest:
17691785
def setUp(self):
17701786
super().setUp()
@@ -1834,6 +1850,7 @@ class StorageTest(
18341850
SerializationTest,
18351851
DeprecatedCoreNotionsTest,
18361852
BaseTestStorage,
1853+
TrimObjectsTest,
18371854
):
18381855
"""Compound of all storage tests."""
18391856

kinto/plugins/history/listener.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from pyramid.settings import asbool, aslist
55

6-
from kinto.core.storage import Filter, Sort
6+
from kinto.core.storage import Filter
77
from kinto.core.utils import COMPARISON, instance_uri
88

99

@@ -145,27 +145,17 @@ def on_resource_changed(event):
145145
if is_trim_by_user_enabled:
146146
filters.append(Filter("user_id", user_id, COMPARISON.EQ))
147147

148-
# Identify the oldest entry to keep.
149-
previous_entries = storage.list_all(
148+
count_deleted = storage.trim_objects(
150149
parent_id=bucket_uri,
151150
resource_name="history",
152151
filters=filters,
153-
sorting=[Sort("last_modified", -1)],
154-
limit=trim_history_max + 1,
152+
max_objects=trim_history_max,
155153
)
156-
# And delete all older ones.
157-
if len(previous_entries) > trim_history_max:
158-
trim_before_timestamp = previous_entries[-1]["last_modified"]
159-
deleted = storage.delete_all(
160-
parent_id=bucket_uri,
161-
resource_name="history",
162-
filters=filters
163-
+ [Filter("last_modified", trim_before_timestamp, COMPARISON.MAX)],
164-
)
165-
logger.info(f"Trimmed {len(deleted)} old history entries.")
154+
if count_deleted > 0:
155+
logger.info(f"Trimmed {count_deleted} old history entries.")
166156
else:
167157
logger.info(
168-
"No old history to trim for {user_id!r} on {resource_name!r} in {bucket_uri!r}."
158+
f"No old history to trim for {user_id!r} on {resource_name!r} in {bucket_uri!r}."
169159
)
170160
else:
171161
logger.info(

tests/core/test_storage.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ def test_mandatory_overrides(self):
7575
(self.storage.purge_deleted, "", ""),
7676
(self.storage.list_all, "", ""),
7777
(self.storage.count_all, "", ""),
78+
(self.storage.trim_objects, "", "", [], 0),
7879
]
7980
for call in calls:
8081
self.assertRaises(NotImplementedError, *call)

0 commit comments

Comments
 (0)