Skip to content

Commit 7d5583c

Browse files
authored
Feat: Add message status filtering to messages API (#806)
* feat: filter message list by status and include status field * fix: exclude message status field from message_list response * chore: linting issues * fix: tests * fix: linting
1 parent e706cbd commit 7d5583c

6 files changed

Lines changed: 120 additions & 43 deletions

File tree

src/aleph/db/accessors/messages.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from aleph_message.models import Chain, ItemHash, MessageType
66
from sqlalchemy import delete, func, nullsfirst, nullslast, select, text, update
77
from sqlalchemy.dialects.postgresql import array, insert
8-
from sqlalchemy.orm import load_only, selectinload
8+
from sqlalchemy.orm import contains_eager, load_only, selectinload
99
from sqlalchemy.sql import Insert, Select
1010
from sqlalchemy.sql.elements import literal
1111

@@ -64,6 +64,7 @@ def make_matching_messages_query(
6464
chains: Optional[Sequence[Chain]] = None,
6565
message_type: Optional[MessageType] = None,
6666
message_types: Optional[Sequence[MessageType]] = None,
67+
message_statuses: Optional[Sequence[MessageStatus]] = None,
6768
start_date: Optional[Union[float, dt.datetime]] = None,
6869
end_date: Optional[Union[float, dt.datetime]] = None,
6970
start_block: Optional[int] = None,
@@ -82,6 +83,19 @@ def make_matching_messages_query(
8283
) -> Select:
8384
select_stmt = select(MessageDb)
8485

86+
if message_statuses:
87+
select_stmt = (
88+
select_stmt.join(
89+
MessageStatusDb, MessageDb.item_hash == MessageStatusDb.item_hash
90+
)
91+
.where(MessageStatusDb.status.in_(message_statuses))
92+
.options(
93+
contains_eager(MessageDb.status).options(
94+
load_only(MessageStatusDb.status)
95+
)
96+
)
97+
)
98+
8599
if include_confirmations:
86100
# Note: we assume this is only used for the API, so we only load the fields
87101
# returned by the API. If additional fields are required, add them here to

src/aleph/db/models/messages.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ class MessageDb(Base):
104104
"ChainTxDb", secondary=message_confirmations
105105
)
106106

107+
status: Optional["MessageStatusDb"] = relationship(
108+
"MessageStatusDb",
109+
primaryjoin="MessageDb.item_hash == MessageStatusDb.item_hash",
110+
foreign_keys=MessageStatusDb.item_hash,
111+
uselist=False, # Critical: Makes it one-to-one
112+
)
113+
107114
_parsed_content: Optional[BaseContent] = None
108115

109116
@property

src/aleph/web/controllers/messages.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ class BaseMessageQueryParams(BaseModel):
8989
message_types: Optional[List[MessageType]] = Field(
9090
default=None, alias="msgTypes", description="Accepted message types."
9191
)
92+
message_statuses: Optional[List[MessageStatus]] = Field(
93+
default=[MessageStatus.PROCESSED, MessageStatus.REMOVING],
94+
alias="msgStatuses",
95+
description="Accepted values for the 'status' field.",
96+
)
9297
addresses: Optional[List[str]] = Field(
9398
default=None, description="Accepted values for the 'sender' field."
9499
)
@@ -176,6 +181,7 @@ def validate_field_dependencies(self):
176181
"chains",
177182
"channels",
178183
"message_types",
184+
"message_statuses",
179185
"tags",
180186
mode="before",
181187
)
@@ -266,6 +272,10 @@ def message_to_dict(message: MessageDb) -> Dict[str, Any]:
266272
]
267273
message_dict["confirmations"] = confirmations
268274
message_dict["confirmed"] = bool(confirmations)
275+
276+
# TODO: Add this field in the response when we make sure it won't break any sdk schema checking
277+
# message_dict["status"] = message.status.status
278+
269279
return message_dict
270280

271281

tests/api/conftest.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
MessageDb,
1919
message_confirmations,
2020
)
21+
from aleph.db.models.messages import MessageStatusDb
2122
from aleph.db.models.posts import PostDb
2223
from aleph.handlers.message_handler import MessageHandler
2324
from aleph.jobs.process_pending_messages import PendingMessageProcessor
2425
from aleph.storage import StorageService
25-
from aleph.toolkit.timestamp import timestamp_to_datetime
26+
from aleph.toolkit.timestamp import timestamp_to_datetime, utc_now
2627
from aleph.types.db_session import DbSessionFactory
28+
from aleph.types.message_status import MessageStatus
2729

2830

2931
# TODO: remove the raw parameter, it's just to avoid larger refactorings
@@ -44,6 +46,7 @@ async def _load_fixtures(
4446
message_db = MessageDb.from_message_dict(message_dict)
4547
messages.append(message_db)
4648
session.add(message_db)
49+
4750
for confirmation in message_dict.get("confirmations", []):
4851
if (tx_hash := confirmation["hash"]) not in tx_hashes:
4952
chain_tx_db = ChainTxDb.from_dict(confirmation)
@@ -56,6 +59,14 @@ async def _load_fixtures(
5659
item_hash=message_db.item_hash, tx_hash=tx_hash
5760
)
5861
)
62+
63+
message_status = MessageStatusDb(
64+
item_hash=message_dict["item_hash"],
65+
status=MessageStatus.PROCESSED,
66+
reception_time=utc_now(),
67+
)
68+
session.add(message_status)
69+
5970
session.commit()
6071

6172
return messages_json if raw else messages
@@ -133,7 +144,7 @@ async def fixture_posts(
133144

134145

135146
@pytest.fixture
136-
def post_with_refs_and_tags() -> Tuple[MessageDb, PostDb]:
147+
def post_with_refs_and_tags() -> Tuple[MessageDb, PostDb, MessageStatusDb]:
137148
message = MessageDb(
138149
item_hash="1234",
139150
sender="0xdeadbeef",
@@ -160,12 +171,20 @@ def post_with_refs_and_tags() -> Tuple[MessageDb, PostDb]:
160171
latest_amend=None,
161172
)
162173

163-
return message, post
174+
message_status = MessageStatusDb(
175+
item_hash=message.item_hash,
176+
status=MessageStatus.PROCESSED,
177+
reception_time=utc_now(),
178+
)
179+
180+
return message, post, message_status
164181

165182

166183
@pytest.fixture
167-
def amended_post_with_refs_and_tags(post_with_refs_and_tags: Tuple[MessageDb, PostDb]):
168-
original_message, original_post = post_with_refs_and_tags
184+
def amended_post_with_refs_and_tags(
185+
post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb]
186+
):
187+
original_message, original_post, _ = post_with_refs_and_tags
169188

170189
amend_message = MessageDb(
171190
item_hash="5678",
@@ -193,7 +212,13 @@ def amended_post_with_refs_and_tags(post_with_refs_and_tags: Tuple[MessageDb, Po
193212
latest_amend=None,
194213
)
195214

196-
return amend_message, amend_post
215+
message_status = MessageStatusDb(
216+
item_hash=amend_message.item_hash,
217+
status=MessageStatus.PROCESSED,
218+
reception_time=utc_now(),
219+
)
220+
221+
return amend_message, amend_post, message_status
197222

198223

199224
@pytest.fixture

tests/api/test_list_messages.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
)
2020

2121
from aleph.db.models import MessageDb, PostDb
22-
from aleph.toolkit.timestamp import timestamp_to_datetime
22+
from aleph.db.models.messages import MessageStatusDb
23+
from aleph.toolkit.timestamp import timestamp_to_datetime, utc_now
2324
from aleph.types.channel import Channel
2425
from aleph.types.db_session import DbSessionFactory
26+
from aleph.types.message_status import MessageStatus
2527

2628
from .utils import get_messages_by_keys
2729

@@ -171,19 +173,21 @@ async def test_get_messages_filter_by_tags(
171173
fixture_messages,
172174
ccn_api_client,
173175
session_factory: DbSessionFactory,
174-
post_with_refs_and_tags: Tuple[MessageDb, PostDb],
175-
amended_post_with_refs_and_tags: Tuple[MessageDb, PostDb],
176+
post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
177+
amended_post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
176178
):
177179
"""
178180
Tests getting messages by tags.
179181
There's no example in the fixtures, we just test that the endpoint returns a 200.
180182
"""
181183

182-
message_db, _ = post_with_refs_and_tags
183-
amend_message_db, _ = amended_post_with_refs_and_tags
184+
message_db, _, message_status_db = post_with_refs_and_tags
185+
amend_message_db, _, amend_message_status_db = amended_post_with_refs_and_tags
184186

185187
with session_factory() as session:
186-
session.add_all([message_db, amend_message_db])
188+
session.add_all(
189+
[message_db, message_status_db, amend_message_db, amend_message_status_db]
190+
)
187191
session.commit()
188192

189193
# Matching tag for both messages
@@ -499,8 +503,8 @@ def get_confirmed_time(msg: Dict) -> Tuple[float, float]:
499503

500504

501505
@pytest.fixture
502-
def instance_message_fixture() -> MessageDb:
503-
return MessageDb(
506+
def instance_message_fixture() -> Tuple[MessageDb, MessageStatusDb]:
507+
message = MessageDb(
504508
item_hash="9f29cdb6579d94be1053b1e1400ee3440958da4cf4feb9b44b674746fdb17c9c",
505509
chain=Chain.ETH,
506510
sender="0xB68B9D4f3771c246233823ed1D3Add451055F9Ef",
@@ -547,24 +551,34 @@ def instance_message_fixture() -> MessageDb:
547551
channel=Channel("TEST"),
548552
)
549553

554+
message_status = MessageStatusDb(
555+
item_hash=message.item_hash,
556+
status=MessageStatus.PROCESSED,
557+
reception_time=utc_now(),
558+
)
559+
560+
return message, message_status
561+
550562

551563
@pytest.mark.asyncio
552564
async def test_get_instance(
553565
ccn_api_client,
554-
instance_message_fixture: MessageDb,
566+
instance_message_fixture: Tuple[MessageDb, MessageStatusDb],
555567
session_factory: DbSessionFactory,
556568
):
569+
570+
message_db, status_db = instance_message_fixture
557571
with session_factory() as session:
558-
session.add(instance_message_fixture)
572+
session.add_all([message_db, status_db])
559573
session.commit()
560574

561575
response = await ccn_api_client.get(
562-
MESSAGES_URI, params={"hashes": instance_message_fixture.item_hash}
576+
MESSAGES_URI, params={"hashes": message_db.item_hash}
563577
)
564578
assert response.status == 200, await response.text()
565579

566580
messages = (await response.json())["messages"]
567581
assert len(messages) == 1
568582

569583
message = messages[0]
570-
assert message["item_hash"] == instance_message_fixture.item_hash
584+
assert message["item_hash"] == message_db.item_hash

tests/api/test_posts.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55

66
from aleph.db.models import MessageDb
7+
from aleph.db.models.messages import MessageStatusDb
78
from aleph.db.models.posts import PostDb
89
from aleph.types.db_session import DbSessionFactory
910

@@ -58,14 +59,13 @@ async def test_get_posts_refs(
5859
ccn_api_client,
5960
session_factory: DbSessionFactory,
6061
fixture_posts: Sequence[PostDb],
61-
post_with_refs_and_tags: Tuple[MessageDb, PostDb],
62+
post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
6263
):
63-
message_db, post_db = post_with_refs_and_tags
64+
message_db, post_db, message_status_db = post_with_refs_and_tags
6465

6566
with session_factory() as session:
6667
session.add_all(fixture_posts)
67-
session.add(message_db)
68-
session.add(post_db)
68+
session.add_all([message_db, post_db, message_status_db])
6969
session.commit()
7070

7171
# Match the ref
@@ -113,20 +113,24 @@ async def test_get_amended_posts_refs(
113113
ccn_api_client,
114114
session_factory: DbSessionFactory,
115115
fixture_posts: Sequence[PostDb],
116-
post_with_refs_and_tags: Tuple[MessageDb, PostDb],
117-
amended_post_with_refs_and_tags: Tuple[MessageDb, PostDb],
116+
post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
117+
amended_post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
118118
):
119-
original_message_db, original_post_db = post_with_refs_and_tags
120-
amend_message_db, amend_post_db = amended_post_with_refs_and_tags
119+
original_message_db, original_post_db, original_message_status_db = (
120+
post_with_refs_and_tags
121+
)
122+
amend_message_db, amend_post_db, amend_message_status_db = (
123+
amended_post_with_refs_and_tags
124+
)
121125

122126
original_post_db.latest_amend = amend_post_db.item_hash
123127

124128
with session_factory() as session:
125129
session.add_all(fixture_posts)
126-
session.add(original_message_db)
127-
session.add(original_post_db)
128-
session.add(amend_message_db)
129-
session.add(amend_post_db)
130+
session.add_all(
131+
[original_message_db, original_post_db, original_message_status_db]
132+
)
133+
session.add_all([amend_message_db, amend_post_db, amend_message_status_db])
130134
session.commit()
131135

132136
# Match the ref
@@ -174,14 +178,13 @@ async def test_get_posts_tags(
174178
ccn_api_client,
175179
session_factory: DbSessionFactory,
176180
fixture_posts: Sequence[PostDb],
177-
post_with_refs_and_tags: Tuple[MessageDb, PostDb],
181+
post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
178182
):
179-
message_db, post_db = post_with_refs_and_tags
183+
message_db, post_db, message_status_db = post_with_refs_and_tags
180184

181185
with session_factory() as session:
182186
session.add_all(fixture_posts)
183-
session.add(message_db)
184-
session.add(post_db)
187+
session.add_all([message_db, post_db, message_status_db])
185188
session.commit()
186189

187190
# Match one tag
@@ -244,20 +247,24 @@ async def test_get_amended_posts_tags(
244247
ccn_api_client,
245248
session_factory: DbSessionFactory,
246249
fixture_posts: Sequence[PostDb],
247-
post_with_refs_and_tags: Tuple[MessageDb, PostDb],
248-
amended_post_with_refs_and_tags: Tuple[MessageDb, PostDb],
250+
post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
251+
amended_post_with_refs_and_tags: Tuple[MessageDb, PostDb, MessageStatusDb],
249252
):
250-
original_message_db, original_post_db = post_with_refs_and_tags
251-
amend_message_db, amend_post_db = amended_post_with_refs_and_tags
253+
original_message_db, original_post_db, original_message_status_db = (
254+
post_with_refs_and_tags
255+
)
256+
amend_message_db, amend_post_db, amend_message_status_db = (
257+
amended_post_with_refs_and_tags
258+
)
252259

253260
original_post_db.latest_amend = amend_post_db.item_hash
254261

255262
with session_factory() as session:
256263
session.add_all(fixture_posts)
257-
session.add(original_message_db)
258-
session.add(original_post_db)
259-
session.add(amend_message_db)
260-
session.add(amend_post_db)
264+
session.add_all(
265+
[original_message_db, original_post_db, original_message_status_db]
266+
)
267+
session.add_all([amend_message_db, amend_post_db, amend_message_status_db])
261268
session.commit()
262269

263270
# Match one tag

0 commit comments

Comments
 (0)