diff --git a/tests/projects/test_beads_bridge.py b/tests/projects/test_beads_bridge.py new file mode 100644 index 00000000..5370e140 --- /dev/null +++ b/tests/projects/test_beads_bridge.py @@ -0,0 +1,891 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest +import pytest_asyncio + +from tinyagentos.projects.beads_bridge import BeadsBridge + + +def _make_bridge(tmp_path: Path, **overrides) -> BeadsBridge: + project_store = MagicMock() + project_store.list_projects = AsyncMock(return_value=[]) + project_store.get_project = AsyncMock(return_value={"id": "prj_1", "slug": "demo"}) + + task_store = MagicMock() + task_store.list_tasks = AsyncMock(return_value=[]) + task_store.list_relationships = AsyncMock(return_value=[]) + task_store.claim_task = AsyncMock(return_value=True) + task_store.release_task = AsyncMock(return_value=True) + task_store.close_task = AsyncMock(return_value=True) + task_store.add_comment = AsyncMock( + return_value={"id": "cmt_1", "task_id": "tsk_a", "author_id": "u", "body": "x"} + ) + task_store.get_task = AsyncMock(return_value=None) + + channel_store = MagicMock() + channel_store.list_channels = AsyncMock(return_value=[]) + + msg_store = MagicMock() + msg_store.send_message = AsyncMock(return_value={"id": "msg_1"}) + + broker = MagicMock() + broker.subscribe = AsyncMock(return_value=asyncio.Queue()) + broker.unsubscribe = AsyncMock() + + return BeadsBridge( + project_store=overrides.get("project_store", project_store), + task_store=overrides.get("task_store", task_store), + channel_store=overrides.get("channel_store", channel_store), + msg_store=overrides.get("msg_store", msg_store), + broker=overrides.get("broker", broker), + data_root=tmp_path, + debounce_seconds=overrides.get("debounce_seconds", 0.05), + ) + + +@pytest.mark.asyncio +async def test_start_and_stop_idempotent(tmp_path): + bridge = _make_bridge(tmp_path) + await bridge.start() + await bridge.stop() + # Second stop is a no-op + await bridge.stop() + + +@pytest.mark.asyncio +async def test_mark_dirty_adds_to_set(tmp_path): + bridge = _make_bridge(tmp_path) + bridge.mark_dirty("prj_1") + assert "prj_1" in bridge._dirty + + +@pytest.mark.asyncio +async def test_mark_dirty_idempotent(tmp_path): + bridge = _make_bridge(tmp_path) + bridge.mark_dirty("prj_1") + bridge.mark_dirty("prj_1") + bridge.mark_dirty("prj_1") + assert bridge._dirty == {"prj_1"} + + +@pytest.mark.asyncio +async def test_writer_drains_dirty_set_after_debounce(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._render_jsonl = AsyncMock() # type: ignore[assignment] + await bridge.start() + bridge.mark_dirty("prj_1") + await asyncio.sleep(0.2) + await bridge.stop() + assert bridge._render_jsonl.await_count >= 1 + assert "prj_1" not in bridge._dirty + + +import json + + +def _task_row(**kw): + base = { + "id": "tsk_a", + "project_id": "prj_1", + "parent_task_id": None, + "title": "T", + "body": "", + "status": "open", + "priority": 1, + "labels": [], + "assignee_id": None, + "claimed_by": None, + "claimed_at": None, + "closed_at": None, + "closed_by": None, + "close_reason": None, + "created_by": "u", + "created_at": 1000.0, + "updated_at": 1000.0, + } + base.update(kw) + return base + + +@pytest.mark.asyncio +async def test_render_writes_jsonl_with_one_line_per_task(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._task_store.list_tasks = AsyncMock( + return_value=[ + _task_row(id="tsk_a", title="A"), + _task_row(id="tsk_b", title="B"), + ] + ) + bridge._task_store.list_relationships = AsyncMock(return_value=[]) + bridge._project_store.get_project = AsyncMock( + return_value={"id": "prj_1", "slug": "demo"} + ) + await bridge._render_jsonl("prj_1") + out = tmp_path / "demo" / ".beads" / "tasks.jsonl" + lines = out.read_text().strip().splitlines() + assert len(lines) == 2 + assert json.loads(lines[0])["id"] == "tsk_a" + assert json.loads(lines[1])["id"] == "tsk_b" + + +@pytest.mark.asyncio +async def test_render_marks_ready_correctly(tmp_path): + """tsk_a blocks tsk_b: while tsk_a is open, tsk_b is not ready.""" + bridge = _make_bridge(tmp_path) + bridge._task_store.list_tasks = AsyncMock( + return_value=[ + _task_row(id="tsk_a", title="A"), + _task_row(id="tsk_b", title="B"), + ] + ) + + async def _list_rels(task_id, direction="from"): + if direction == "from" and task_id == "tsk_a": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"} + ] + if direction == "to" and task_id == "tsk_b": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"} + ] + return [] + + bridge._task_store.list_relationships = AsyncMock(side_effect=_list_rels) + bridge._project_store.get_project = AsyncMock( + return_value={"id": "prj_1", "slug": "demo"} + ) + bridge._task_store.get_task = AsyncMock( + side_effect=lambda task_id: { + "tsk_a": _task_row(id="tsk_a", status="open"), + "tsk_b": _task_row(id="tsk_b", status="open"), + }.get(task_id) + ) + await bridge._render_jsonl("prj_1") + lines = (tmp_path / "demo" / ".beads" / "tasks.jsonl").read_text().splitlines() + by_id = {json.loads(line)["id"]: json.loads(line) for line in lines} + assert by_id["tsk_a"]["ready"] is True + assert by_id["tsk_b"]["ready"] is False + + +@pytest.mark.asyncio +async def test_render_skips_unknown_project(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._project_store.get_project = AsyncMock(return_value=None) + await bridge._render_jsonl("prj_missing") + # No file should have been written + assert list(tmp_path.rglob("tasks.jsonl")) == [] + + +@pytest.mark.asyncio +async def test_render_uses_atomic_replace(tmp_path): + """Render must write to a tmp file and os.replace into place.""" + bridge = _make_bridge(tmp_path) + bridge._task_store.list_tasks = AsyncMock(return_value=[_task_row()]) + bridge._task_store.list_relationships = AsyncMock(return_value=[]) + bridge._project_store.get_project = AsyncMock( + return_value={"id": "prj_1", "slug": "demo"} + ) + await bridge._render_jsonl("prj_1") + beads_dir = tmp_path / "demo" / ".beads" + # No leftover .tmp file + assert not list(beads_dir.glob("*.tmp")) + assert (beads_dir / "tasks.jsonl").exists() + + +@pytest.mark.asyncio +async def test_render_failure_re_marks_dirty(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._render_jsonl = AsyncMock(side_effect=RuntimeError("boom")) # type: ignore[assignment] + await bridge.start() + bridge.mark_dirty("prj_1") + await asyncio.sleep(0.2) + await bridge.stop() + # Failed render re-marks the project dirty + assert "prj_1" in bridge._dirty + + +@pytest.mark.asyncio +async def test_backfill_active_marks_every_active_project(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._project_store.list_projects = AsyncMock( + return_value=[ + {"id": "prj_1", "slug": "a"}, + {"id": "prj_2", "slug": "b"}, + ] + ) + n = await bridge.backfill_active() + assert n == 2 + assert bridge._dirty == {"prj_1", "prj_2"} + + +@pytest.mark.asyncio +async def test_backfill_active_no_projects(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._project_store.list_projects = AsyncMock(return_value=[]) + n = await bridge.backfill_active() + assert n == 0 + assert bridge._dirty == set() + + +@pytest.mark.asyncio +async def test_export_now_writes_synchronously(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._task_store.list_tasks = AsyncMock(return_value=[_task_row()]) + bridge._task_store.list_relationships = AsyncMock(return_value=[]) + bridge._project_store.get_project = AsyncMock( + return_value={"id": "prj_1", "slug": "demo"} + ) + path = await bridge.export_now("prj_1") + assert path.exists() + assert path.name == "tasks.jsonl" + + +@pytest.mark.asyncio +async def test_export_now_returns_none_for_missing_project(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._project_store.get_project = AsyncMock(return_value=None) + path = await bridge.export_now("prj_missing") + assert path is None + + +@pytest.mark.asyncio +async def test_on_event_claimed_posts_system_message(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_a", "title": "Hello", "status": "claimed"} + ) + await bridge.on_event( + "prj_1", + {"kind": "task.claimed", "payload": {"id": "tsk_a", "claimed_by": "alice"}}, + ) + assert bridge._msg_store.send_message.await_count == 1 + kwargs = bridge._msg_store.send_message.await_args.kwargs + assert kwargs["channel_id"] == "ch_1" + assert kwargs["author_id"] == "bridge" + assert kwargs["author_type"] == "system" + assert kwargs["content_type"] == "system" + assert "alice claimed tsk_a" in kwargs["content"] + + +@pytest.mark.asyncio +async def test_on_event_released_posts_system_message(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_a", "title": "T", "status": "open", "claimed_by": None} + ) + await bridge.on_event( + "prj_1", {"kind": "task.released", "payload": {"id": "tsk_a"}} + ) + assert bridge._msg_store.send_message.await_count == 1 + assert "released tsk_a" in ( + bridge._msg_store.send_message.await_args.kwargs["content"] + ) + + +@pytest.mark.asyncio +async def test_on_event_closed_posts_system_message(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + bridge._task_store.get_task = AsyncMock( + return_value={ + "id": "tsk_a", + "title": "T", + "status": "closed", + "closed_by": "alice", + "close_reason": "ship it", + } + ) + bridge._task_store.list_relationships = AsyncMock(return_value=[]) + await bridge.on_event( + "prj_1", + {"kind": "task.closed", "payload": {"id": "tsk_a", "closed_by": "alice"}}, + ) + # First call is the closed message + first_call = bridge._msg_store.send_message.await_args_list[0] + assert "alice closed tsk_a" in first_call.kwargs["content"] + assert "ship it" in first_call.kwargs["content"] + + +@pytest.mark.asyncio +async def test_on_event_no_a2a_channel_is_silent(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[]) + bridge._task_store.get_task = AsyncMock(return_value={"id": "tsk_a", "title": "T"}) + await bridge.on_event( + "prj_1", + {"kind": "task.claimed", "payload": {"id": "tsk_a", "claimed_by": "alice"}}, + ) + assert bridge._msg_store.send_message.await_count == 0 + + +@pytest.mark.asyncio +async def test_on_event_ignores_unknown_kinds(tmp_path): + bridge = _make_bridge(tmp_path) + await bridge.on_event( + "prj_1", {"kind": "task.created", "payload": {"id": "tsk_a"}} + ) + assert bridge._msg_store.send_message.await_count == 0 + + +@pytest.mark.asyncio +async def test_on_event_closed_emits_ready_for_unblocked_dependents(tmp_path): + """When tsk_a closes, tsk_b (which was blocked by tsk_a) should get a + ⚡ ready system message in the A2A channel.""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + + async def _get_task(task_id): + if task_id == "tsk_a": + return { + "id": "tsk_a", + "title": "A", + "status": "closed", + "closed_by": "alice", + "close_reason": None, + } + if task_id == "tsk_b": + return { + "id": "tsk_b", + "title": "B", + "status": "open", + "labels": ["frontend"], + } + return None + + bridge._task_store.get_task = AsyncMock(side_effect=_get_task) + + async def _list_rels(task_id, direction="from"): + # tsk_a blocks tsk_b + if task_id == "tsk_a" and direction == "from": + return [ + { + "from_task_id": "tsk_a", + "to_task_id": "tsk_b", + "kind": "blocks", + } + ] + if task_id == "tsk_b" and direction == "to": + return [ + { + "from_task_id": "tsk_a", + "to_task_id": "tsk_b", + "kind": "blocks", + } + ] + return [] + + bridge._task_store.list_relationships = AsyncMock(side_effect=_list_rels) + + await bridge.on_event( + "prj_1", + {"kind": "task.closed", "payload": {"id": "tsk_a", "closed_by": "alice"}}, + ) + + # Two messages: closed for tsk_a, ready for tsk_b + assert bridge._msg_store.send_message.await_count == 2 + bodies = [ + c.kwargs["content"] for c in bridge._msg_store.send_message.await_args_list + ] + assert any("closed tsk_a" in b for b in bodies) + assert any("tsk_b ready" in b for b in bodies) + + +@pytest.mark.asyncio +async def test_on_event_closed_no_ready_when_other_blocker_open(tmp_path): + """If tsk_b has another open blocker besides the closing tsk_a, no + ⚡ ready emits.""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + + async def _get_task(task_id): + if task_id == "tsk_a": + return {"id": "tsk_a", "title": "A", "status": "closed"} + if task_id == "tsk_b": + return {"id": "tsk_b", "title": "B", "status": "open", "labels": []} + if task_id == "tsk_c": + return {"id": "tsk_c", "title": "C", "status": "open"} # still blocking + return None + + bridge._task_store.get_task = AsyncMock(side_effect=_get_task) + + async def _list_rels(task_id, direction="from"): + if task_id == "tsk_a" and direction == "from": + return [{"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"}] + if task_id == "tsk_b" and direction == "to": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"}, + {"from_task_id": "tsk_c", "to_task_id": "tsk_b", "kind": "blocks"}, + ] + return [] + + bridge._task_store.list_relationships = AsyncMock(side_effect=_list_rels) + + await bridge.on_event( + "prj_1", {"kind": "task.closed", "payload": {"id": "tsk_a"}} + ) + + bodies = [ + c.kwargs["content"] for c in bridge._msg_store.send_message.await_args_list + ] + assert not any("ready" in b for b in bodies) + + +@pytest.mark.asyncio +async def test_on_event_closed_emits_ready_for_multiple_dependents(tmp_path): + """Closing tsk_a should emit ⚡ ready for both tsk_b and tsk_c when each + has only tsk_a as their blocker.""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + + async def _get_task(task_id): + if task_id == "tsk_a": + return { + "id": "tsk_a", + "title": "A", + "status": "closed", + "closed_by": "alice", + } + if task_id == "tsk_b": + return {"id": "tsk_b", "title": "B", "status": "open", "labels": []} + if task_id == "tsk_c": + return {"id": "tsk_c", "title": "C", "status": "open", "labels": []} + return None + + bridge._task_store.get_task = AsyncMock(side_effect=_get_task) + + async def _list_rels(task_id, direction="from"): + if task_id == "tsk_a" and direction == "from": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"}, + {"from_task_id": "tsk_a", "to_task_id": "tsk_c", "kind": "blocks"}, + ] + if task_id == "tsk_b" and direction == "to": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"} + ] + if task_id == "tsk_c" and direction == "to": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_c", "kind": "blocks"} + ] + return [] + + bridge._task_store.list_relationships = AsyncMock(side_effect=_list_rels) + + await bridge.on_event( + "prj_1", + {"kind": "task.closed", "payload": {"id": "tsk_a", "closed_by": "alice"}}, + ) + + calls = bridge._msg_store.send_message.await_args_list + system_bodies = [ + c.kwargs["content"] + for c in calls + if c.kwargs.get("content_type") == "system" + ] + # 1 closed + 2 ready + assert len(system_bodies) == 3 + assert any("closed tsk_a" in b for b in system_bodies) + assert any("tsk_b ready" in b for b in system_bodies) + assert any("tsk_c ready" in b for b in system_bodies) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("dependent_status", ["closed", "cancelled"]) +async def test_on_event_closed_no_ready_when_dependent_not_open( + tmp_path, dependent_status +): + """If tsk_b is already closed/cancelled, no ⚡ ready emits even though + its sole blocker tsk_a just closed.""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + + async def _get_task(task_id): + if task_id == "tsk_a": + return {"id": "tsk_a", "title": "A", "status": "closed"} + if task_id == "tsk_b": + return {"id": "tsk_b", "title": "B", "status": dependent_status} + return None + + bridge._task_store.get_task = AsyncMock(side_effect=_get_task) + + async def _list_rels(task_id, direction="from"): + if task_id == "tsk_a" and direction == "from": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"} + ] + if task_id == "tsk_b" and direction == "to": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"} + ] + return [] + + bridge._task_store.list_relationships = AsyncMock(side_effect=_list_rels) + + await bridge.on_event( + "prj_1", {"kind": "task.closed", "payload": {"id": "tsk_a"}} + ) + + bodies = [ + c.kwargs["content"] for c in bridge._msg_store.send_message.await_args_list + ] + # closed message posted, but no ready + assert any("closed tsk_a" in b for b in bodies) + assert not any("⚡" in b for b in bodies) + + +@pytest.mark.asyncio +async def test_on_event_closed_ready_synthesis_handles_missing_dependent(tmp_path): + """If get_task returns None for the dependent referenced by an outbound + blocks edge, _announce_newly_ready must skip silently — closed message + still posts, no ready, no exception.""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + + async def _get_task(task_id): + if task_id == "tsk_a": + return {"id": "tsk_a", "title": "A", "status": "closed"} + # tsk_b lookup returns None (orphan edge) + return None + + bridge._task_store.get_task = AsyncMock(side_effect=_get_task) + + async def _list_rels(task_id, direction="from"): + if task_id == "tsk_a" and direction == "from": + return [ + {"from_task_id": "tsk_a", "to_task_id": "tsk_b", "kind": "blocks"} + ] + return [] + + bridge._task_store.list_relationships = AsyncMock(side_effect=_list_rels) + + # No exception should escape + await bridge.on_event( + "prj_1", {"kind": "task.closed", "payload": {"id": "tsk_a"}} + ) + + bodies = [ + c.kwargs["content"] for c in bridge._msg_store.send_message.await_args_list + ] + assert any("closed tsk_a" in b for b in bodies) + assert not any("ready" in b for b in bodies) + + +def _a2a_ch(): + return { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + + +def _msg(content: str, **kw): + base = { + "id": "msg_1", + "channel_id": "ch_1", + "author_id": "alice", + "author_type": "agent", + "content": content, + "content_type": "text", + } + base.update(kw) + return base + + +@pytest.mark.asyncio +async def test_on_chat_message_skips_system_content_type(tmp_path): + """Bridge must not loop on its own system messages.""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + await bridge.on_chat_message( + "prj_1", "ch_1", _msg("/claim tsk_abc", content_type="system") + ) + assert bridge._task_store.claim_task.await_count == 0 + + +@pytest.mark.asyncio +async def test_on_chat_message_skips_non_a2a_channel(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_other", + "name": "general", + "type": "topic", + "settings": {}, + } + ] + ) + await bridge.on_chat_message("prj_1", "ch_other", _msg("/claim tsk_abc")) + assert bridge._task_store.claim_task.await_count == 0 + + +@pytest.mark.asyncio +async def test_on_chat_message_claim_calls_task_store(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + await bridge.on_chat_message("prj_1", "ch_1", _msg("/claim tsk_abc")) + bridge._task_store.claim_task.assert_awaited_once_with("tsk_abc", "alice") + + +@pytest.mark.asyncio +async def test_on_chat_message_release_calls_task_store(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + await bridge.on_chat_message("prj_1", "ch_1", _msg("/release tsk_abc")) + bridge._task_store.release_task.assert_awaited_once_with("tsk_abc", "alice") + + +@pytest.mark.asyncio +async def test_on_chat_message_close_with_note_calls_task_store(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + await bridge.on_chat_message("prj_1", "ch_1", _msg("/close tsk_abc shipped")) + bridge._task_store.close_task.assert_awaited_once_with( + "tsk_abc", closed_by="alice", reason="shipped" + ) + + +@pytest.mark.asyncio +async def test_on_chat_message_multiple_verbs_processed_in_order(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + await bridge.on_chat_message( + "prj_1", "ch_1", _msg("/claim tsk_a\n/close tsk_b done") + ) + bridge._task_store.claim_task.assert_awaited_once_with("tsk_a", "alice") + bridge._task_store.close_task.assert_awaited_once_with( + "tsk_b", closed_by="alice", reason="done" + ) + + +@pytest.mark.asyncio +async def test_on_chat_message_verb_failure_is_silent(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + bridge._task_store.claim_task = AsyncMock(side_effect=ValueError("nope")) + # Should not raise + await bridge.on_chat_message("prj_1", "ch_1", _msg("/claim tsk_abc")) + + +@pytest.mark.asyncio +async def test_on_event_closed_ready_synthesis_failure_does_not_break_close_message( + tmp_path, +): + """If list_relationships raises while announcing newly-ready dependents, + the closed message must already have been posted and no exception + escapes on_event.""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock( + return_value=[ + { + "id": "ch_1", + "name": "a2a", + "type": "group", + "settings": {"kind": "a2a"}, + } + ] + ) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_a", "title": "A", "status": "closed"} + ) + bridge._task_store.list_relationships = AsyncMock( + side_effect=RuntimeError("boom") + ) + + # No exception escapes on_event + await bridge.on_event( + "prj_1", {"kind": "task.closed", "payload": {"id": "tsk_a"}} + ) + + # Closed message was posted before _announce_newly_ready ran + calls = bridge._msg_store.send_message.await_args_list + assert len(calls) == 1 + assert "closed tsk_a" in calls[0].kwargs["content"] + + +@pytest.mark.asyncio +async def test_on_chat_message_mention_attaches_comment(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_abc", "project_id": "prj_1"} + ) + await bridge.on_chat_message( + "prj_1", "ch_1", _msg("seeing tsk_abc explode under load") + ) + bridge._task_store.add_comment.assert_awaited_once() + kwargs = bridge._task_store.add_comment.await_args.kwargs + assert kwargs["task_id"] == "tsk_abc" + assert kwargs["author_id"] == "alice" + assert "explode under load" in kwargs["body"] + + +@pytest.mark.asyncio +async def test_on_chat_message_mention_dedupes_per_message_task(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_abc", "project_id": "prj_1"} + ) + msg = _msg("ping tsk_abc again", id="msg_dup") + await bridge.on_chat_message("prj_1", "ch_1", msg) + await bridge.on_chat_message("prj_1", "ch_1", msg) + assert bridge._task_store.add_comment.await_count == 1 + + +@pytest.mark.asyncio +async def test_on_chat_message_mention_skips_unknown_task(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + bridge._task_store.get_task = AsyncMock(return_value=None) # task not found + await bridge.on_chat_message("prj_1", "ch_1", _msg("ghost tsk_zzz")) + assert bridge._task_store.add_comment.await_count == 0 + + +@pytest.mark.asyncio +async def test_on_chat_message_mention_skips_cross_project_task(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_abc", "project_id": "prj_OTHER"} + ) + await bridge.on_chat_message("prj_1", "ch_1", _msg("tsk_abc here")) + assert bridge._task_store.add_comment.await_count == 0 + + +@pytest.mark.asyncio +async def test_on_chat_message_verb_alone_does_not_double_attach(tmp_path): + """`/claim tsk_abc` should hit claim_task once and NOT also create a + comment for the same id (the verb is the action; commenting on top + would be noise).""" + bridge = _make_bridge(tmp_path) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_abc", "project_id": "prj_1"} + ) + await bridge.on_chat_message("prj_1", "ch_1", _msg("/claim tsk_abc")) + bridge._task_store.claim_task.assert_awaited_once() + bridge._task_store.add_comment.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_start_subscribes_to_broker_for_active_projects(tmp_path): + bridge = _make_bridge(tmp_path) + bridge._project_store.list_projects = AsyncMock( + return_value=[ + {"id": "prj_1", "slug": "a"}, + {"id": "prj_2", "slug": "b"}, + ] + ) + await bridge.start() + await bridge.backfill_active() + # backfill_active should have subscribed each active project to the broker + assert bridge._broker.subscribe.await_count == 2 + await bridge.stop() + + +@pytest.mark.asyncio +async def test_broker_event_triggers_on_event(tmp_path): + bridge = _make_bridge(tmp_path) + queue: asyncio.Queue = asyncio.Queue() + bridge._broker.subscribe = AsyncMock(return_value=queue) + bridge._project_store.list_projects = AsyncMock( + return_value=[{"id": "prj_1", "slug": "a"}] + ) + bridge._channel_store.list_channels = AsyncMock(return_value=[_a2a_ch()]) + bridge._task_store.get_task = AsyncMock( + return_value={"id": "tsk_x", "title": "X", "status": "claimed"} + ) + + await bridge.start() + await bridge.backfill_active() + + # Simulate an event arriving on the broker queue + from tinyagentos.projects.events import ProjectEvent + queue.put_nowait( + ProjectEvent(kind="task.claimed", payload={"id": "tsk_x", "claimed_by": "bob"}) + ) + await asyncio.sleep(0.1) + await bridge.stop() + + # The subscriber loop should have called on_event, which posts a system msg + assert bridge._msg_store.send_message.await_count >= 1 diff --git a/tests/projects/test_beads_format.py b/tests/projects/test_beads_format.py new file mode 100644 index 00000000..3cd524b7 --- /dev/null +++ b/tests/projects/test_beads_format.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +from tinyagentos.projects.beads_format import ( + compute_ready, + task_to_jsonl_dict, +) + + +def _task(**kw) -> dict: + base = { + "id": "tsk_a3f8c2", + "project_id": "prj_1", + "parent_task_id": None, + "title": "T", + "body": "", + "status": "open", + "priority": 1, + "labels": ["x"], + "assignee_id": None, + "claimed_by": None, + "claimed_at": None, + "closed_at": None, + "closed_by": None, + "close_reason": None, + "created_by": "u1", + "created_at": 1000.0, + "updated_at": 1000.0, + } + base.update(kw) + return base + + +def test_task_to_jsonl_dict_no_relationships_no_assignee(): + t = _task(id="tsk_a", title="Hello") + out = task_to_jsonl_dict(t, outbound_relationships=[], ready=True) + assert out["id"] == "tsk_a" + assert out["title"] == "Hello" + assert out["status"] == "open" + assert out["priority"] == "p2" # priority int 1 -> "p2" + assert out["labels"] == ["x"] + assert out["assignee_ids"] == [] + assert out["parent_id"] is None + assert out["deps"] == [] + assert out["ready"] is True + + +def test_task_to_jsonl_dict_with_assignee_and_parent(): + t = _task(id="tsk_b", assignee_id="agent_alice", parent_task_id="tsk_root") + out = task_to_jsonl_dict(t, outbound_relationships=[], ready=False) + assert out["assignee_ids"] == ["agent_alice"] + assert out["parent_id"] == "tsk_root" + + +def test_task_to_jsonl_dict_with_relationships_preserves_order(): + t = _task(id="tsk_c") + rels = [ + {"from_task_id": "tsk_c", "to_task_id": "tsk_x", "kind": "blocks"}, + {"from_task_id": "tsk_c", "to_task_id": "tsk_y", "kind": "relates_to"}, + {"from_task_id": "tsk_c", "to_task_id": "tsk_z", "kind": "blocks"}, + ] + out = task_to_jsonl_dict(t, outbound_relationships=rels, ready=True) + assert out["deps"] == [ + {"task_id": "tsk_x", "kind": "blocks"}, + {"task_id": "tsk_y", "kind": "relates_to"}, + {"task_id": "tsk_z", "kind": "blocks"}, + ] + + +def test_task_to_jsonl_dict_priority_clamped_to_p3(): + # priority ints map: 0→p3 (lowest), 1→p2, 2→p1, 3+→p0 (highest) + assert task_to_jsonl_dict(_task(priority=0), [], False)["priority"] == "p3" + assert task_to_jsonl_dict(_task(priority=1), [], False)["priority"] == "p2" + assert task_to_jsonl_dict(_task(priority=2), [], False)["priority"] == "p1" + assert task_to_jsonl_dict(_task(priority=3), [], False)["priority"] == "p0" + assert task_to_jsonl_dict(_task(priority=99), [], False)["priority"] == "p0" + + +def test_compute_ready_open_no_blockers_is_ready(): + assert compute_ready(_task(status="open"), incoming_blocker_statuses=[]) is True + + +def test_compute_ready_open_with_open_blocker_not_ready(): + assert compute_ready(_task(status="open"), incoming_blocker_statuses=["open"]) is False + + +def test_compute_ready_open_with_only_closed_blockers_is_ready(): + assert ( + compute_ready(_task(status="open"), incoming_blocker_statuses=["closed", "closed"]) + is True + ) + + +def test_compute_ready_closed_task_never_ready(): + assert compute_ready(_task(status="closed"), incoming_blocker_statuses=[]) is False + + +def test_compute_ready_claimed_task_not_ready(): + assert compute_ready(_task(status="claimed"), incoming_blocker_statuses=[]) is False + + +from tinyagentos.projects.beads_format import ( + format_claimed, + format_closed, + format_ready, + format_released, +) + + +def test_format_claimed(): + assert format_claimed("alice", "tsk_abc", "Wire OAuth") == ( + '🤚 alice claimed tsk_abc — "Wire OAuth"' + ) + + +def test_format_released(): + assert format_released("alice", "tsk_abc", "Wire OAuth") == ( + '↩️ alice released tsk_abc — "Wire OAuth"' + ) + + +def test_format_closed_without_note(): + assert format_closed("alice", "tsk_abc", "Wire OAuth", note=None) == ( + '✅ alice closed tsk_abc — "Wire OAuth"' + ) + + +def test_format_closed_with_note(): + assert format_closed("alice", "tsk_abc", "Wire OAuth", note="ship it") == ( + '✅ alice closed tsk_abc — "Wire OAuth"\nship it' + ) + + +def test_format_closed_strips_blank_note(): + assert format_closed("alice", "tsk_abc", "T", note=" ") == ( + '✅ alice closed tsk_abc — "T"' + ) + + +def test_format_ready_with_labels(): + assert format_ready("tsk_abc", "Wire OAuth", labels=["auth", "ui"]) == ( + '⚡ tsk_abc ready — "Wire OAuth" — auth, ui' + ) + + +def test_format_ready_no_labels(): + assert format_ready("tsk_abc", "Wire OAuth", labels=[]) == ( + '⚡ tsk_abc ready — "Wire OAuth"' + ) + + +from tinyagentos.projects.beads_format import parse_verbs, scan_task_ids + + +def test_parse_verbs_simple(): + assert parse_verbs("/claim tsk_abc") == [("claim", "tsk_abc", None)] + + +def test_parse_verbs_release(): + assert parse_verbs("/release tsk_abc") == [("release", "tsk_abc", None)] + + +def test_parse_verbs_close_with_note(): + assert parse_verbs("/close tsk_abc done shipping") == [ + ("close", "tsk_abc", "done shipping") + ] + + +def test_parse_verbs_close_without_note(): + assert parse_verbs("/close tsk_abc") == [("close", "tsk_abc", None)] + + +def test_parse_verbs_multiple_lines(): + body = "/claim tsk_a\n/close tsk_b done\nstray" + assert parse_verbs(body) == [ + ("claim", "tsk_a", None), + ("close", "tsk_b", "done"), + ] + + +def test_parse_verbs_indented_line_not_matched(): + assert parse_verbs(" /claim tsk_abc") == [] + + +def test_parse_verbs_unknown_verb_not_matched(): + assert parse_verbs("/foo tsk_abc") == [] + + +def test_parse_verbs_invalid_task_id_not_matched(): + assert parse_verbs("/claim notavalid") == [] + assert parse_verbs("/claim tsk_") == [] # must have at least one hex char + + +def test_parse_verbs_empty_body(): + assert parse_verbs("") == [] + + +def test_scan_task_ids_finds_all(): + assert scan_task_ids("see tsk_abc and tsk_def for context") == [ + "tsk_abc", + "tsk_def", + ] + + +def test_scan_task_ids_dedupes_preserve_order(): + assert scan_task_ids("tsk_abc tsk_def tsk_abc") == ["tsk_abc", "tsk_def"] + + +def test_scan_task_ids_word_boundary(): + # xtsk_abc is not a match (no leading word boundary) + assert scan_task_ids("xtsk_abc tsk_def") == ["tsk_def"] + + +def test_scan_task_ids_none_in_body(): + assert scan_task_ids("nothing here") == [] diff --git a/tests/projects/test_routes_beads.py b/tests/projects/test_routes_beads.py new file mode 100644 index 00000000..1884f575 --- /dev/null +++ b/tests/projects/test_routes_beads.py @@ -0,0 +1,236 @@ +"""Integration tests for the Beads bridge wired into the FastAPI app. + +Uses the existing 'client' fixture which builds a real app via create_app +with a tmp data dir. Bridge is exercised end-to-end: route hooks, chat +hooks, broker subscription, JSONL writes. +""" +from __future__ import annotations + +import asyncio +import json +from pathlib import Path + +import pytest +from httpx import ASGITransport, AsyncClient + + +@pytest.mark.asyncio +async def test_app_boots_with_beads_bridge(app): + """The bridge must be attached to app.state on lifespan startup.""" + async with app.router.lifespan_context(app): + bridge = app.state.beads_bridge + assert bridge is not None + # data_root should be data_dir/projects + assert bridge._data_root.name == "projects" + + +def _auth_client(app): + """Return a session-cookie-authenticated AsyncClient for the given app.""" + app.state.auth.setup_user("admin", "Test Admin", "", "testpass") + record = app.state.auth.find_user("admin") + uid = record["id"] if record else "" + token = app.state.auth.create_session(user_id=uid, long_lived=True) + return AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + cookies={"taos_session": token}, + ) + + +@pytest.mark.asyncio +async def test_create_task_marks_project_dirty(app): + async with app.router.lifespan_context(app): + async with _auth_client(app) as c: + r = await c.post( + "/api/projects", + json={"name": "Demo", "slug": "demo-marks-dirty"}, + ) + assert r.status_code == 200, r.text + project = r.json() + + bridge = app.state.beads_bridge + bridge._dirty.clear() + + r = await c.post( + f"/api/projects/{project['id']}/tasks", + json={"title": "T1"}, + ) + assert r.status_code == 200, r.text + + assert project["id"] in bridge._dirty + + +@pytest.mark.asyncio +async def test_export_endpoint_writes_jsonl(app): + async with app.router.lifespan_context(app): + async with _auth_client(app) as c: + r = await c.post( + "/api/projects", + json={"name": "Demo", "slug": "demo-export"}, + ) + project = r.json() + await c.post( + f"/api/projects/{project['id']}/tasks", + json={"title": "Hello world"}, + ) + r = await c.post(f"/api/projects/{project['id']}/beads/export") + assert r.status_code == 200, r.text + body = r.json() + assert body["path"].endswith("/.beads/tasks.jsonl") + p = Path(body["path"]) + assert p.exists() + line = p.read_text().strip() + assert json.loads(line)["title"] == "Hello world" + + +@pytest.mark.asyncio +async def test_a2a_claim_verb_via_rest_post_claims_task(app): + """Send '/claim tsk_' as a non-system message into the project's + A2A channel via REST. Bridge should claim the task without invoking + agent dispatch.""" + async with app.router.lifespan_context(app): + async with _auth_client(app) as c: + r = await c.post( + "/api/projects", + json={"name": "Demo", "slug": "demo-verb"}, + ) + project = r.json() + + # Create a task to claim + r = await c.post( + f"/api/projects/{project['id']}/tasks", + json={"title": "claimable"}, + ) + task = r.json() + + # Find the project's A2A channel + r = await c.get(f"/api/chat/channels?project_id={project['id']}") + channels = r.json()["channels"] + a2a_list = [c2 for c2 in channels if (c2.get("settings") or {}).get("kind") == "a2a"] + assert a2a_list, "Expected A2A channel to be created for project" + a2a = a2a_list[0] + + # Send the verb as an "agent" non-system message + r = await c.post( + "/api/chat/messages", + json={ + "channel_id": a2a["id"], + "author_id": "alice", + "author_type": "agent", + "content": f"/claim {task['id']}", + "content_type": "text", + }, + ) + assert r.status_code == 200, r.text + + # Allow a tick for the bridge to process the hook + await asyncio.sleep(0.1) + + # Task should now be claimed by alice + r = await c.get( + f"/api/projects/{project['id']}/tasks/{task['id']}" + ) + assert r.json()["status"] == "claimed" + assert r.json()["claimed_by"] == "alice" + + +@pytest.mark.asyncio +async def test_a2a_mention_attaches_comment(app): + async with app.router.lifespan_context(app): + async with _auth_client(app) as c: + r = await c.post( + "/api/projects", + json={"name": "Demo", "slug": "demo-mention"}, + ) + project = r.json() + r = await c.post( + f"/api/projects/{project['id']}/tasks", + json={"title": "mentioned"}, + ) + task = r.json() + r = await c.get(f"/api/chat/channels?project_id={project['id']}") + channels = r.json()["channels"] + a2a_list = [c2 for c2 in channels if (c2.get("settings") or {}).get("kind") == "a2a"] + assert a2a_list, "Expected A2A channel to be created for project" + a2a = a2a_list[0] + + await c.post( + "/api/chat/messages", + json={ + "channel_id": a2a["id"], + "author_id": "bob", + "author_type": "agent", + "content": f"chasing {task['id']} in prod", + "content_type": "text", + }, + ) + await asyncio.sleep(0.1) + + r = await c.get( + f"/api/projects/{project['id']}/tasks/{task['id']}/comments" + ) + items = r.json()["items"] + assert any(c2["author_id"] == "bob" and "prod" in c2["body"] for c2 in items) + + +@pytest.mark.asyncio +async def test_bridge_render_failure_does_not_break_routes(app, monkeypatch): + """If _render_jsonl raises, the route still returns 200 and the + project is re-marked dirty for retry.""" + async with app.router.lifespan_context(app): + async with _auth_client(app) as c: + r = await c.post( + "/api/projects", + json={"name": "Demo", "slug": "demo-fail"}, + ) + assert r.status_code == 200, r.text + project = r.json() + + bridge = app.state.beads_bridge + calls = {"n": 0} + real_render = bridge._render_jsonl + + async def flaky_render(project_id): + calls["n"] += 1 + if calls["n"] == 1: + raise RuntimeError("simulated render failure") + return await real_render(project_id) + + monkeypatch.setattr(bridge, "_render_jsonl", flaky_render) + + r = await c.post( + f"/api/projects/{project['id']}/tasks", + json={"title": "First"}, + ) + assert r.status_code == 200, r.text + + # Wait for first render attempt (fails) + re-mark dirty + retry + await asyncio.sleep(0.6) + + beads_file = ( + Path(app.state.projects_root) + / project["slug"] + / ".beads" + / "tasks.jsonl" + ) + assert beads_file.exists() + + +@pytest.mark.asyncio +async def test_bridge_none_does_not_break_route(app, monkeypatch): + """If beads_bridge is None (e.g. construction failed at boot), routes + are unaffected and tasks still create successfully.""" + async with app.router.lifespan_context(app): + async with _auth_client(app) as c: + monkeypatch.setattr(app.state, "beads_bridge", None) + r = await c.post( + "/api/projects", + json={"name": "Demo", "slug": "demo-none"}, + ) + assert r.status_code == 200, r.text + project = r.json() + r = await c.post( + f"/api/projects/{project['id']}/tasks", + json={"title": "T"}, + ) + assert r.status_code == 200, r.text diff --git a/tinyagentos/app.py b/tinyagentos/app.py index ae955926..0f3e9685 100644 --- a/tinyagentos/app.py +++ b/tinyagentos/app.py @@ -614,6 +614,28 @@ async def _reload_llm_proxy_on_catalog_change() -> None: except Exception: logger.exception("a2a backfill failed") + # Beads bridge: project task graph ↔ A2A coordination channel. + # See docs/superpowers/specs/2026-04-27-projects-beads-bridge-design.md. + # Construction failure must not break boot — log and continue + # without a bridge; routes already null-check app.state.beads_bridge. + try: + from tinyagentos.projects.beads_bridge import BeadsBridge + beads_bridge = BeadsBridge( + project_store=project_store, + task_store=project_task_store, + channel_store=chat_channels, + msg_store=chat_messages, + broker=project_event_broker, + data_root=projects_root, + ) + await beads_bridge.start() + await beads_bridge.backfill_active() + app.state.beads_bridge = beads_bridge + logger.info("beads bridge ready") + except Exception: + logger.exception("beads bridge failed to start — continuing without") + app.state.beads_bridge = None + yield # NOTE: controller restart/shutdown does NOT touch agent containers — # agents and LiteLLM keep running independently, so there's nothing to @@ -647,6 +669,12 @@ async def _reload_llm_proxy_on_catalog_change() -> None: await user_memory.close() await desktop_settings.close() await canvas_store.close() + try: + bb = getattr(app.state, "beads_bridge", None) + if bb is not None: + await bb.stop() + except Exception: + logger.exception("beads bridge stop failed") await project_task_store.close() await project_store.close() await chat_channels.close() @@ -718,6 +746,7 @@ async def _reload_llm_proxy_on_catalog_change() -> None: app.state.project_store = project_store app.state.project_task_store = project_task_store app.state.project_event_broker = project_event_broker + app.state.beads_bridge = None projects_root.mkdir(parents=True, exist_ok=True) app.state.projects_root = projects_root app.state.chat_hub = chat_hub diff --git a/tinyagentos/projects/beads_bridge.py b/tinyagentos/projects/beads_bridge.py new file mode 100644 index 00000000..d24ee287 --- /dev/null +++ b/tinyagentos/projects/beads_bridge.py @@ -0,0 +1,435 @@ +"""Beads bridge: project task graph ↔ A2A coordination channel. + +See docs/superpowers/specs/2026-04-27-projects-beads-bridge-design.md. + +The bridge has three input edges and one output edge. Inputs: + - mark_dirty(project_id) from route hooks after task/relationship mutations + - on_chat_message(...) from chat send hooks + - on_event(project_id, evt) from a broker subscription + +Output: a single async writer task drains the dirty set on debounce and +writes data/projects//.beads/tasks.jsonl. Per-project asyncio.Lock +serializes renders for the same project so concurrent ticks can't race. + +Failure isolation rule: every public entry point catches and logs; +nothing the bridge does can break a route or boot. Mirror of +tinyagentos/projects/a2a.py. +""" +from __future__ import annotations + +import asyncio +import logging +import os +from collections import defaultdict, deque +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +# Shutdown drain budget — how long stop() waits for the final tick. +_STOP_DRAIN_TIMEOUT = 2.0 + + +class BeadsBridge: + def __init__( + self, + *, + project_store, + task_store, + channel_store, + msg_store, + broker, + data_root: Path, + debounce_seconds: float = 0.2, + ) -> None: + self._project_store = project_store + self._task_store = task_store + self._channel_store = channel_store + self._msg_store = msg_store + self._broker = broker + self._data_root = Path(data_root) + self._debounce = float(debounce_seconds) + + self._dirty: set[str] = set() + self._locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + self._writer_task: asyncio.Task | None = None + self._broker_tasks: dict[str, asyncio.Task] = {} + self._broker_queues: dict[str, Any] = {} # project_id -> Queue + self._stopped = asyncio.Event() + # Bounded dedupe of (message_id, task_id) pairs already attached + # as comments. FIFO eviction once cap is hit; collisions after + # eviction merely create one duplicate comment row. + self._seen_comments_set: set[tuple[str, str]] = set() + self._seen_comments_order: deque[tuple[str, str]] = deque(maxlen=1024) + + async def start(self) -> None: + if self._writer_task is not None: + return + self._stopped.clear() + self._writer_task = asyncio.create_task( + self._writer_loop(), name="beads-bridge-writer" + ) + + async def stop(self) -> None: + if self._writer_task is None: + return + self._stopped.set() + try: + await asyncio.wait_for(self._writer_task, timeout=_STOP_DRAIN_TIMEOUT) + except asyncio.TimeoutError: + self._writer_task.cancel() + try: + await self._writer_task + except (asyncio.CancelledError, Exception): + pass + finally: + self._writer_task = None + # Cancel any per-project broker subscriber tasks + for t in self._broker_tasks.values(): + t.cancel() + for t in self._broker_tasks.values(): + try: + await t + except (asyncio.CancelledError, Exception): + pass + self._broker_tasks.clear() + self._broker_queues.clear() + + def mark_dirty(self, project_id: str) -> None: + if not project_id: + return + self._dirty.add(project_id) + + async def backfill_active(self) -> int: + """Mark every active project dirty and subscribe to broker. Called at boot.""" + try: + projects = await self._project_store.list_projects(status="active") + except Exception: + logger.exception("beads bridge: backfill list_projects failed") + return 0 + count = 0 + for p in projects: + self.mark_dirty(p["id"]) + await self._ensure_subscribed(p["id"]) + count += 1 + return count + + async def _ensure_subscribed(self, project_id: str) -> None: + if project_id in self._broker_tasks: + return + try: + queue = await self._broker.subscribe(project_id) + except Exception: + logger.exception( + "beads bridge: broker subscribe failed for %s", project_id + ) + return + self._broker_queues[project_id] = queue + self._broker_tasks[project_id] = asyncio.create_task( + self._broker_loop(project_id, queue), + name=f"beads-bridge-broker:{project_id}", + ) + + async def _broker_loop(self, project_id: str, queue: Any) -> None: + try: + while not self._stopped.is_set(): + try: + ev = await asyncio.wait_for(queue.get(), timeout=1.0) + except asyncio.TimeoutError: + continue + # ProjectEvent dataclass has .kind and .payload + event = {"kind": ev.kind, "payload": ev.payload} + await self.on_event(project_id, event) + except asyncio.CancelledError: + raise + except Exception: + logger.exception( + "beads bridge: broker loop crashed for %s", project_id + ) + finally: + try: + await self._broker.unsubscribe(project_id, queue) + except Exception: + pass + + async def export_now(self, project_id: str) -> Path | None: + """Synchronous render-and-write. Returns the file path, or None + if the project doesn't exist.""" + project = await self._project_store.get_project(project_id) + if project is None: + return None + async with self._locks[project_id]: + await self._render_jsonl(project_id) + return self._data_root / project["slug"] / ".beads" / "tasks.jsonl" + + async def _writer_loop(self) -> None: + while not self._stopped.is_set(): + try: + await asyncio.sleep(self._debounce) + if not self._dirty: + continue + # Snapshot and clear; a fresh mutation during render will + # re-add the project. + pending = list(self._dirty) + self._dirty.clear() + for project_id in pending: + try: + async with self._locks[project_id]: + await self._render_jsonl(project_id) + except Exception: + logger.exception( + "beads bridge: render failed for %s", project_id + ) + # Re-mark so the next tick retries. + self._dirty.add(project_id) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("beads bridge: writer loop iteration crashed") + + async def _render_jsonl(self, project_id: str) -> None: + project = await self._project_store.get_project(project_id) + if project is None: + return + slug = project["slug"] + beads_dir = self._data_root / slug / ".beads" + beads_dir.mkdir(parents=True, exist_ok=True) + target = beads_dir / "tasks.jsonl" + tmp = beads_dir / f"tasks.jsonl.{os.getpid()}.tmp" + + tasks = await self._task_store.list_tasks(project_id=project_id) + from tinyagentos.projects.beads_format import ( + compute_ready, + task_to_jsonl_dict, + ) + import json + + lines: list[str] = [] + for t in tasks: + outbound = await self._task_store.list_relationships( + t["id"], direction="from" + ) + incoming = await self._task_store.list_relationships( + t["id"], direction="to" + ) + incoming_blocker_statuses: list[str] = [] + for rel in incoming: + if rel.get("kind") != "blocks": + continue + src = await self._task_store.get_task(rel["from_task_id"]) + if src is not None: + incoming_blocker_statuses.append(src.get("status", "open")) + ready = compute_ready(t, incoming_blocker_statuses) + lines.append( + json.dumps(task_to_jsonl_dict(t, outbound, ready), separators=(",", ":")) + ) + + tmp.write_text("\n".join(lines) + ("\n" if lines else "")) + os.replace(tmp, target) + + async def _find_a2a_channel(self, project_id: str) -> dict | None: + """Resolve the project's A2A channel. None if missing/archived.""" + try: + channels = await self._channel_store.list_channels( + project_id=project_id, archived=False, + ) + except Exception: + logger.exception("beads bridge: list_channels failed for %s", project_id) + return None + for ch in channels: + if ( + ch.get("name") == "a2a" + and ch.get("type") == "group" + and (ch.get("settings") or {}).get("kind") == "a2a" + ): + return ch + return None + + async def _post_system(self, channel_id: str, body: str) -> None: + try: + await self._msg_store.send_message( + channel_id=channel_id, + author_id="bridge", + author_type="system", + content=body, + content_type="system", + state="complete", + ) + except Exception: + logger.exception("beads bridge: send_message failed for %s", channel_id) + + async def on_event(self, project_id: str, event: dict) -> None: + try: + kind = event.get("kind") + if kind not in ("task.claimed", "task.released", "task.closed"): + return + payload = event.get("payload") or {} + tsk_id = payload.get("id") + if not tsk_id: + return + channel = await self._find_a2a_channel(project_id) + if channel is None: + return + task = await self._task_store.get_task(tsk_id) + if task is None: + return + title = task.get("title", "") + from tinyagentos.projects.beads_format import ( + format_claimed, + format_closed, + format_released, + ) + if kind == "task.claimed": + actor = payload.get("claimed_by") or task.get("claimed_by") or "agent" + await self._post_system( + channel["id"], format_claimed(actor, tsk_id, title) + ) + elif kind == "task.released": + # The release event payload doesn't include releaser_id; + # use the actor we last knew about, or fall back to "agent". + actor = task.get("claimed_by") or "agent" + await self._post_system( + channel["id"], format_released(actor, tsk_id, title) + ) + elif kind == "task.closed": + actor = payload.get("closed_by") or task.get("closed_by") or "agent" + note = task.get("close_reason") + await self._post_system( + channel["id"], format_closed(actor, tsk_id, title, note) + ) + await self._announce_newly_ready(channel["id"], project_id, tsk_id) + except Exception: + logger.exception( + "beads bridge: on_event crashed for %s/%s", project_id, event + ) + + async def _announce_newly_ready( + self, channel_id: str, project_id: str, closed_task_id: str + ) -> None: + from tinyagentos.projects.beads_format import format_ready + try: + outbound = await self._task_store.list_relationships( + closed_task_id, direction="from" + ) + except Exception: + logger.exception( + "beads bridge: list_relationships failed for %s", closed_task_id + ) + return + for rel in outbound: + if rel.get("kind") != "blocks": + continue + dependent_id = rel["to_task_id"] + dep = await self._task_store.get_task(dependent_id) + if dep is None or dep.get("status") != "open": + continue + # Check every other blocker on this dependent is also closed. + other_blockers = await self._task_store.list_relationships( + dependent_id, direction="to" + ) + still_blocked = False + for other in other_blockers: + if other.get("kind") != "blocks": + continue + if other["from_task_id"] == closed_task_id: + continue + src = await self._task_store.get_task(other["from_task_id"]) + if src is not None and src.get("status") not in ("closed", "cancelled"): + still_blocked = True + break + if still_blocked: + continue + await self._post_system( + channel_id, + format_ready(dependent_id, dep.get("title", ""), list(dep.get("labels") or [])), + ) + + async def on_chat_message( + self, project_id: str, channel_id: str, message: dict + ) -> None: + """Chat send hook. Filters non-A2A channels and our own system + messages, then dispatches verbs and attaches mention comments.""" + try: + if message.get("content_type") == "system": + return + channel = await self._find_a2a_channel(project_id) + if channel is None or channel["id"] != channel_id: + return + body = message.get("content") or "" + author = message.get("author_id") or "agent" + verb_ids = await self._dispatch_verbs(body, author) + await self._attach_mentions( + project_id=project_id, + message_id=message.get("id") or "", + author=author, + body=body, + verb_ids=verb_ids, + ) + except Exception: + logger.exception( + "beads bridge: on_chat_message crashed for %s/%s", + project_id, channel_id, + ) + + async def _dispatch_verbs(self, body: str, author: str) -> set[str]: + from tinyagentos.projects.beads_format import parse_verbs + acted: set[str] = set() + for verb, tsk_id, note in parse_verbs(body): + acted.add(tsk_id) + try: + if verb == "claim": + await self._task_store.claim_task(tsk_id, author) + elif verb == "release": + await self._task_store.release_task(tsk_id, author) + elif verb == "close": + await self._task_store.close_task( + tsk_id, closed_by=author, reason=note + ) + except Exception: + logger.info( + "beads bridge: verb /%s %s by %s failed", + verb, tsk_id, author, exc_info=True, + ) + return acted + + async def _attach_mentions( + self, + *, + project_id: str, + message_id: str, + author: str, + body: str, + verb_ids: set[str], + ) -> None: + from tinyagentos.projects.beads_format import scan_task_ids + for tsk_id in scan_task_ids(body): + if tsk_id in verb_ids: + continue + key = (message_id, tsk_id) + if key in self._seen_comments_set: + continue + try: + task = await self._task_store.get_task(tsk_id) + except Exception: + logger.exception( + "beads bridge: get_task failed for %s", tsk_id, + ) + continue + if task is None or task.get("project_id") != project_id: + continue + try: + await self._task_store.add_comment( + task_id=tsk_id, author_id=author, body=body, + ) + except Exception: + logger.info( + "beads bridge: add_comment failed for %s", tsk_id, + exc_info=True, + ) + continue + # Record only after successful attach. Bounded FIFO eviction. + if len(self._seen_comments_order) == self._seen_comments_order.maxlen: + evicted = self._seen_comments_order[0] + self._seen_comments_set.discard(evicted) + self._seen_comments_order.append(key) + self._seen_comments_set.add(key) diff --git a/tinyagentos/projects/beads_format.py b/tinyagentos/projects/beads_format.py new file mode 100644 index 00000000..1b55ad61 --- /dev/null +++ b/tinyagentos/projects/beads_format.py @@ -0,0 +1,109 @@ +"""Pure helpers for the Beads bridge. + +No IO. No imports from other tinyagentos modules. Easy to test. +""" +from __future__ import annotations + +import re +from datetime import datetime, timezone +from typing import Literal + +_VERB_RE = re.compile( + r"^/(claim|release|close)[ \t]+(tsk[-_][a-z0-9]+)(?:[ \t]+(.+))?$", + flags=re.MULTILINE, +) +_TASK_ID_RE = re.compile(r"\btsk[-_][a-z0-9]+\b") +_PRIORITY_MAP = {0: "p3", 1: "p2", 2: "p1"} + + +def _isoformat(ts: float | None) -> str | None: + if ts is None: + return None + return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def task_to_jsonl_dict( + task: dict, outbound_relationships: list[dict], ready: bool +) -> dict: + """Map a ProjectTaskStore row + its outbound relationships to the + JSONL schema described in the spec §4.2.""" + pri_int = int(task.get("priority", 0)) + priority = _PRIORITY_MAP.get(pri_int, "p0") + assignee_id = task.get("assignee_id") + return { + "id": task["id"], + "title": task.get("title", ""), + "description": task.get("body", ""), + "status": task.get("status", "open"), + "priority": priority, + "labels": list(task.get("labels") or []), + "assignee_ids": [assignee_id] if assignee_id else [], + "parent_id": task.get("parent_task_id"), + "deps": [ + {"task_id": r["to_task_id"], "kind": r["kind"]} + for r in outbound_relationships + ], + "ready": bool(ready), + "created_at": _isoformat(task.get("created_at")), + "updated_at": _isoformat(task.get("updated_at")), + } + + +def compute_ready(task: dict, incoming_blocker_statuses: list[str]) -> bool: + """`ready` iff status is open AND every incoming `blocks` edge points + from a task whose status is `closed` (or `cancelled`).""" + if task.get("status") != "open": + return False + return all(s in ("closed", "cancelled") for s in incoming_blocker_statuses) + + +def format_claimed(agent: str, tsk_id: str, title: str) -> str: + return f'🤚 {agent} claimed {tsk_id} — "{title}"' + + +def format_released(agent: str, tsk_id: str, title: str) -> str: + return f'↩️ {agent} released {tsk_id} — "{title}"' + + +def format_closed(agent: str, tsk_id: str, title: str, note: str | None) -> str: + head = f'✅ {agent} closed {tsk_id} — "{title}"' + if note and note.strip(): + return f"{head}\n{note.strip()}" + return head + + +def format_ready(tsk_id: str, title: str, labels: list[str]) -> str: + head = f'⚡ {tsk_id} ready — "{title}"' + if labels: + return f"{head} — {', '.join(labels)}" + return head + + +Verb = Literal["claim", "release", "close"] + + +def parse_verbs(body: str) -> list[tuple[Verb, str, str | None]]: + """Find lines matching `^/(claim|release|close) tsk[-_][ note]$`. + + Returns tuples in document order. + """ + out: list[tuple[Verb, str, str | None]] = [] + for m in _VERB_RE.finditer(body or ""): + verb = m.group(1) + tsk = m.group(2) + note = m.group(3) + out.append((verb, tsk, note if note else None)) # type: ignore[arg-type] + return out + + +def scan_task_ids(body: str) -> list[str]: + """Find all `\\btsk[-_][a-z0-9]+\\b` ids, deduped, order preserved.""" + seen: set[str] = set() + out: list[str] = [] + for m in _TASK_ID_RE.finditer(body or ""): + tid = m.group(0) + if tid in seen: + continue + seen.add(tid) + out.append(tid) + return out diff --git a/tinyagentos/routes/chat.py b/tinyagentos/routes/chat.py index 452d8b74..c3be385a 100644 --- a/tinyagentos/routes/chat.py +++ b/tinyagentos/routes/chat.py @@ -18,6 +18,17 @@ router = APIRouter() logger = logging.getLogger(__name__) +_background_tasks: set[asyncio.Task] = set() + + +def _spawn_background(coro) -> asyncio.Task: + """Schedule a fire-and-forget coroutine, retaining a reference so it + cannot be GC'd before completion (RUF006).""" + task = asyncio.create_task(coro) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) + return task + async def _capture_user_memory( user_memory, @@ -47,6 +58,20 @@ async def _capture_user_memory( logger.debug(f"user memory capture failed: {e}") +async def _beads_on_chat_message(app, channel: dict, message: dict) -> None: + """Best-effort hand-off to the Beads bridge. Never raises.""" + bridge = getattr(app.state, "beads_bridge", None) + if bridge is None: + return + project_id = channel.get("project_id") + if not project_id: + return + try: + await bridge.on_chat_message(project_id, channel["id"], message) + except Exception: + logger.warning("beads on_chat_message failed", exc_info=True) + + @router.get("/api/docs/chat-guide") async def get_chat_guide(): from pathlib import Path as _Path @@ -100,10 +125,12 @@ async def chat_ws(websocket: WebSocket): await hub.broadcast(data["channel_id"], {"type": "message", "seq": hub.next_seq(), **message}) router_svc = getattr(websocket.app.state, "agent_chat_router", None) - if router_svc is not None: - channel = await ch_store.get_channel(data["channel_id"]) - if channel is not None: - router_svc.dispatch(message, channel) + if _ws_channel is not None: + if router_svc is not None: + router_svc.dispatch(message, _ws_channel) + _spawn_background( + _beads_on_chat_message(websocket.app, _ws_channel, message) + ) # Capture user message into user memory (async, non-blocking) user_memory = getattr(websocket.app.state, "user_memory", None) @@ -237,7 +264,8 @@ async def post_message(request: Request): stripped = content.lstrip() if stripped.startswith("/"): channel = await ch_store.get_channel(channel_id) - if channel and channel.get("type") != "dm": + _is_a2a = ((channel or {}).get("settings") or {}).get("kind") == "a2a" + if channel and channel.get("type") != "dm" and not _is_a2a: from tinyagentos.chat.mentions import parse_mentions members = list(channel.get("members") or []) mentions = parse_mentions(content, members) @@ -333,10 +361,12 @@ async def post_message(request: Request): pass # Never block chat for archive failures router_svc = getattr(request.app.state, "agent_chat_router", None) - if router_svc is not None: - channel = await ch_store.get_channel(channel_id) - if channel is not None: - router_svc.dispatch(message, channel) + if _http_channel is not None: + if router_svc is not None: + router_svc.dispatch(message, _http_channel) + _spawn_background( + _beads_on_chat_message(request.app, _http_channel, message) + ) return message diff --git a/tinyagentos/routes/projects.py b/tinyagentos/routes/projects.py index c22057e2..acf13ed4 100644 --- a/tinyagentos/routes/projects.py +++ b/tinyagentos/routes/projects.py @@ -60,6 +60,19 @@ def _mirror(request: Request, project: dict) -> None: ) +def _beads_mark_dirty(request: Request, project_id: str) -> None: + """Best-effort hand-off to the Beads bridge. Never raises.""" + bridge = getattr(request.app.state, "beads_bridge", None) + if bridge is None: + return + try: + bridge.mark_dirty(project_id) + except Exception: + logger.warning( + "beads mark_dirty failed for project %s", project_id, exc_info=True + ) + + @router.post("/api/projects") async def create_project(payload: CreateProjectIn, request: Request): store = request.app.state.project_store @@ -312,6 +325,7 @@ async def create_task(project_id: str, payload: CreateTaskIn, request: Request): parent_task_id=payload.parent_task_id, created_by=_user_id(request), ) + _beads_mark_dirty(request, project_id) await pstore.log_activity(project_id, _user_id(request), "task.created", {"task_id": t["id"], "title": t["title"]}) return t @@ -360,6 +374,7 @@ async def update_task(project_id: str, task_id: str, payload: UpdateTaskIn, requ cur = await store.get_task(cur["parent_task_id"]) await store.update_task(task_id, **payload.model_dump(exclude_none=True)) + _beads_mark_dirty(request, project_id) return await store.get_task(task_id) @@ -372,6 +387,7 @@ async def claim_task(project_id: str, task_id: str, payload: ClaimIn, request: R ok = await store.claim_task(task_id, payload.claimer_id) if not ok: return JSONResponse({"error": "already claimed"}, status_code=409) + _beads_mark_dirty(request, project_id) pstore = request.app.state.project_store await pstore.log_activity(project_id, payload.claimer_id, "task.claimed", {"task_id": task_id}) return await store.get_task(task_id) @@ -386,6 +402,7 @@ async def release_task(project_id: str, task_id: str, payload: ReleaseIn, reques ok = await store.release_task(task_id, payload.releaser_id) if not ok: return JSONResponse({"error": "not claimed by releaser"}, status_code=409) + _beads_mark_dirty(request, project_id) return await store.get_task(task_id) @@ -398,6 +415,7 @@ async def close_task(project_id: str, task_id: str, payload: CloseIn, request: R ok = await store.close_task(task_id, closed_by=payload.closed_by, reason=payload.reason) if not ok: return JSONResponse({"error": "cannot close"}, status_code=409) + _beads_mark_dirty(request, project_id) pstore = request.app.state.project_store await pstore.log_activity(project_id, payload.closed_by, "task.closed", {"task_id": task_id}) project = await pstore.get_project(project_id) @@ -439,6 +457,7 @@ async def add_relationship(project_id: str, task_id: str, payload: AddRelIn, req ) except ValueError as e: return JSONResponse({"error": str(e)}, status_code=400) + _beads_mark_dirty(request, project_id) return rel @@ -531,3 +550,19 @@ async def event_stream(): media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) + + +@router.post("/api/projects/{project_id}/beads/export") +async def beads_export(project_id: str, request: Request): + """Force a synchronous render of the project's .beads/tasks.jsonl + snapshot. Returns 503 when the bridge isn't running.""" + bridge = getattr(request.app.state, "beads_bridge", None) + if bridge is None: + return JSONResponse({"error": "beads bridge not running"}, status_code=503) + pstore = request.app.state.project_store + if await pstore.get_project(project_id) is None: + return JSONResponse({"error": "not found"}, status_code=404) + path = await bridge.export_now(project_id) + if path is None: + return JSONResponse({"error": "export failed"}, status_code=500) + return {"path": str(path)}