diff --git a/README.md b/README.md index 71fa6f31f..191ee96d6 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,135 @@ And force a re-index: Meadow.Data.Indexer.reindex_all() ``` +### AI Agent Plans + +Meadow supports AI agent-generated plans for batch modifications to works. The system uses a two-table structure that allows agents to propose work-specific changes based on high-level prompts. + +#### Data Model + +**Plans** - High-level task definitions +- `prompt`: Natural language instruction (e.g., "Add a date_created EDTF string for the work based on the work's existing description, creator, and temporal subjects") +- `query`: OpenSearch query string identifying target works + - Collection query: `"collection.id:abc-123"` + - Specific works: `"id:(work-id-1 OR work-id-2 OR work-id-3)"` +- `status`: `:pending`, `:approved`, `:rejected`, `:executed`, or `:error` + +**PlanChanges** - Work-specific modifications +- `plan_id`: Foreign key to parent plan +- `work_id`: Specific work being modified +- `add`: Map of values to append to existing work data +- `delete`: Map of values to remove from existing work data +- `replace`: Map of values to fully replace in work data +- `status`: Individual approval/rejection tracking + +Each PlanChange must specify at least one operation (`add`, `delete`, or `replace`). + +#### PlanChange payloads + +- `add` merges values into existing metadata. For lists (like subjects or notes) the values are appended when they are not already present. Scalar fields (e.g., `title`) are merged according to the context (`:append` for `add`, `:replace` for `replace`). +- `delete` removes the provided values verbatim. For controlled vocabularies this means the JSON structure must match what is stored in the database (role/term maps). The planner normalizes structs and string-keyed maps automatically when executing changes. +- `replace` overwrites existing values for the provided keys. Use this when the existing content should be replaced entirely instead of appended or removed. + +Controlled metadata entries (subjects, creators, contributors, etc.) follow the shape below. For subjects you must supply both the `role` (with at least `id`/`scheme`) and the `term.id`; extra fields such as `label` or `variants` are ignored during execution but can be included when working with structs in IEx: + +```elixir +%{ + descriptive_metadata: %{ + subject: [ + %{ + role: %{id: "TOPICAL", scheme: "subject_role"}, + term: %{ + id: "http://id.loc.gov/authorities/subjects/sh85141086", + label: "Universities and colleges", + variants: ["Colleges", "Higher education institutions"] + } + } + ] + } +} +``` + +When constructing PlanChanges you can mix-and-match operations as needed. For example, to remove an outdated subject and add a new one in a single change: + +```elixir +delete: %{ + descriptive_metadata: %{ + subject: [ + %{role: %{id: "TOPICAL", scheme: "subject_role"}, term: %{id: "mock1:result2"}} + ] + } +}, +add: %{ + descriptive_metadata: %{ + subject: [ + %{role: %{id: "TOPICAL", scheme: "subject_role"}, term: %{id: "mock1:result5"}} + ] + } +} +``` + +#### Example Workflows + +**Adding new metadata:** +```elixir +# 1. Create a plan with a query - PlanChanges are auto-generated for matching works +{:ok, plan} = Meadow.Data.Planner.create_plan(%{ + prompt: "Add a date_created EDTF string for the work based on the work's existing description, creator, and temporal subjects", + query: "collection.id:abc-123" +}) + +# 2. Agent updates each auto-generated PlanChange with work-specific values +changes = Meadow.Data.Planner.list_plan_changes(plan.id) + +change_a = Enum.at(changes, 0) +{:ok, updated_change_a} = Meadow.Data.Planner.update_plan_change(change_a, %{ + add: %{descriptive_metadata: %{date_created: ["1896-11-10"]}} +}) + +change_b = Enum.at(changes, 1) +{:ok, updated_change_b} = Meadow.Data.Planner.update_plan_change(change_b, %{ + add: %{descriptive_metadata: %{date_created: ["1923-05"]}} +}) +``` + +**Removing unwanted values:** +```elixir +# Remove extraneous subject headings +{:ok, change} = Meadow.Data.Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: "work-id", + delete: %{ + descriptive_metadata: %{ + subject: [ + %{role: %{id: "TOPICAL", scheme: "subject_role"}, term: %{id: "http://example.org/photograph"}}, + %{role: %{id: "TOPICAL", scheme: "subject_role"}, term: %{id: "http://example.org/image"}} + ] + } + } +}) +``` + +**Replacing existing values:** +```elixir +# Replace the title +{:ok, change} = Meadow.Data.Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: "work-id", + replace: %{descriptive_metadata: %{title: "New Title"}} +}) +``` + +**Reviewing and executing:** +```elixir +# 3. User reviews and approves +{:ok, _} = Meadow.Data.Planner.approve_plan(plan, "user@example.com") +{:ok, _} = Meadow.Data.Planner.approve_plan_change(change_a, "user@example.com") +{:ok, _} = Meadow.Data.Planner.approve_plan_change(change_b, "user@example.com") + +# 4. Execute approved changes +{:ok, executed_plan} = Meadow.Data.Planner.execute_plan(plan) +``` + ### Doing development on the Meadow Pipeline lambdas In the AWS developer environment, the lambdas associated with the pipeline are shared amongst developers. In order to do development and see whether it's working you can override the configuration to use your local files instead of the deployed lambdas. Example below (you don't have to override them all. Just the ones you need). diff --git a/app/lib/meadow/data/planner.ex b/app/lib/meadow/data/planner.ex new file mode 100644 index 000000000..322e8c7c9 --- /dev/null +++ b/app/lib/meadow/data/planner.ex @@ -0,0 +1,1142 @@ +defmodule Meadow.Data.Planner do + @moduledoc """ + The Planner context for managing AI agent plans and their proposed changes. + + ## Workflow Overview + + 1. **Create Plan**: Create a plan with a prompt and query. PlanChanges are automatically + populated from the query results. Supports simple query strings or JSON queries. + ``` + # Simple query string + {:ok, plan} = create_plan(%{ + prompt: "Add date_created EDTF strings based on work metadata", + query: "collection.id:abc-123" + }) + + # Or with JSON query + {:ok, plan} = create_plan(%{ + prompt: "Add date_created EDTF strings based on work metadata", + query: ~s({"query": {"match": {"collection.id": "abc-123"}}}) + }) + # Plan now has PlanChanges for each work in the query results + ``` + + 2. **Update Changes**: Agent updates the changeset for each PlanChange + ``` + changes = list_plan_changes(plan.id) + Enum.each(changes, fn change -> + case update_plan_change(change, %{ + add: %{ + descriptive_metadata: %{ + subject: [ + %{ + role: %{id: "TOPICAL", scheme: "subject_role"}, + term: %{id: "http://id.loc.gov/authorities/subjects/sh85141086"} + } + ] + } + } + }) do + {:ok, _updated_change} -> :ok + {:error, message} -> Logger.error("Error: " <> message) + end + end) + ``` + + 3. **Review**: User reviews and approves/rejects plan and individual changes + ``` + approve_plan(plan, "user-netid") + approve_plan_change(change, "user-netid") + ``` + + 4. **Execute**: Apply approved changes to works + ``` + execute_plan(plan) + ``` + """ + import Ecto.Query, warn: false + alias Meadow.Data.Schemas.{Plan, PlanChange} + alias Meadow.Data.Schemas.Work + alias Meadow.Data.Works + alias Meadow.Repo + alias Meadow.Utils.{Atoms, ChangesetErrors, StructMap} + + @doc """ + Returns the list of plans. + + ## Examples + + iex> list_plans() + [%Plan{}, ...] + """ + def list_plans do + Repo.all(Plan) + end + + @doc """ + Returns a list of plans matching the given criteria. + + ## Example Criteria + + [{:limit, 15}, {:status, :pending}, {:user, "user-netid"}] + + ## Examples + + iex> list_plans([status: :pending]) + [%Plan{status: :pending}, ...] + """ + def list_plans(criteria) do + criteria + |> plan_query() + |> Repo.all() + end + + @doc """ + Returns a composable query matching the given criteria. + + ## Examples + + iex> plan_query([status: :pending]) |> Repo.all() + [%Plan{status: :pending}, ...] + """ + def plan_query(criteria) do + query = from(Plan) + + Enum.reduce(criteria, query, fn + {:limit, limit}, query -> + from(p in query, limit: ^limit) + + {:status, status}, query -> + from(p in query, where: p.status == ^status) + + {:user, user}, query -> + from(p in query, where: p.user == ^user) + + {:order, order}, query -> + from(p in query, order_by: [{^order, :inserted_at}]) + end) + end + + @doc """ + Gets a single plan. + + Raises `Ecto.NoResultsError` if the Plan does not exist. + + ## Examples + + iex> get_plan!("123") + %Plan{} + + iex> get_plan!("456") + ** (Ecto.NoResultsError) + """ + def get_plan!(id, opts \\ []) do + query = from(p in Plan, where: p.id == ^id) + + query = + if opts[:preload_changes] do + from(p in query, preload: :plan_changes) + else + query + end + + Repo.one!(query) + end + + @doc """ + Gets a single plan. + + Returns `nil` if the Plan does not exist. + + ## Examples + + iex> get_plan("123") + %Plan{} + + iex> get_plan("456") + nil + """ + def get_plan(id, opts \\ []) do + query = from(p in Plan, where: p.id == ^id) + + query = + if opts[:preload_changes] do + from(p in query, preload: :plan_changes) + else + query + end + + Repo.one(query) + end + + @doc """ + Gets pending plans. + + ## Examples + + iex> get_pending_plans() + [%Plan{status: :pending}, ...] + """ + def get_pending_plans(opts \\ []) do + query = from(p in Plan, where: p.status == :pending, order_by: [asc: :inserted_at]) + + query = + if opts[:preload_changes] do + from(p in query, preload: :plan_changes) + else + query + end + + Repo.all(query) + end + + @doc """ + Gets approved plans ready for execution. + + ## Examples + + iex> get_approved_plans() + [%Plan{status: :approved}, ...] + """ + def get_approved_plans(opts \\ []) do + query = from(p in Plan, where: p.status == :approved, order_by: [asc: :inserted_at]) + + query = + if opts[:preload_changes] do + from(p in query, preload: :plan_changes) + else + query + end + + Repo.all(query) + end + + @doc """ + Creates a plan and automatically populates PlanChanges from the query results. + + ## Examples + + # With OpenSearch query string + iex> create_plan(%{ + ...> prompt: "Translate titles to Spanish", + ...> query: "collection.id:abc-123" + ...> }) + {:ok, %Plan{}} + + # With specific work IDs + iex> create_plan(%{ + ...> prompt: "Look up LCNAF contributors", + ...> query: "id:(work-1 OR work-2 OR work-3)" + ...> }) + {:ok, %Plan{}} + + iex> create_plan(%{prompt: nil}) + {:error, %Ecto.Changeset{}} + """ + def create_plan(attrs \\ %{}) do + changeset = Plan.changeset(%Plan{}, attrs) + + changeset + |> validate_and_create_plan(attrs) + end + + defp validate_and_create_plan(%Ecto.Changeset{valid?: false} = changeset, _attrs) do + {:error, changeset} + end + + defp validate_and_create_plan(changeset, %{query: query} = _attrs) when is_binary(query) do + Repo.transaction(fn -> + plan = Repo.insert!(changeset) + populate_plan_changes(plan, query) + end) + end + + defp validate_and_create_plan(changeset, _attrs) do + Repo.transaction(fn -> + Repo.insert!(changeset) + end) + end + + @doc """ + Same as create_plan/1 but raises on error. + """ + def create_plan!(attrs \\ %{}) do + case create_plan(attrs) do + {:ok, plan} -> plan + {:error, changeset} -> raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset + end + end + + @doc """ + Updates a plan. + + ## Examples + + iex> update_plan(plan, %{notes: "Updated notes"}) + {:ok, %Plan{}} + + iex> update_plan(plan, %{status: :invalid}) + {:error, %Ecto.Changeset{}} + """ + def update_plan(%Plan{} = plan, attrs) do + plan + |> Plan.changeset(attrs) + |> Repo.update() + end + + @doc """ + Approves a plan and optionally all its pending changes. + + ## Examples + + iex> approve_plan(plan, "user-netid") + {:ok, %Plan{status: :approved, user: "user-netid"}} + + iex> approve_plan(plan, "user-netid", approve_changes: true) + {:ok, %Plan{status: :approved, user: "user-netid"}} + """ + def approve_plan(%Plan{} = plan, user \\ nil, opts \\ []) do + result = + plan + |> Plan.approve(user) + |> Repo.update() + + if opts[:approve_changes] do + case result do + {:ok, approved_plan} -> + # Also approve all pending changes + from(c in PlanChange, + where: c.plan_id == ^approved_plan.id and c.status == :pending, + update: [set: [status: :approved, user: ^user]] + ) + |> Repo.update_all([]) + + {:ok, approved_plan} + + error -> error + end + else + result + end + end + + @doc """ + Rejects a plan. + + ## Examples + + iex> reject_plan(plan, "Changes not needed") + {:ok, %Plan{status: :rejected, notes: "Changes not needed"}} + """ + def reject_plan(%Plan{} = plan, notes \\ nil) do + plan + |> Plan.reject(notes) + |> Repo.update() + end + + @doc """ + Marks a plan as executed. + + ## Examples + + iex> mark_plan_executed(plan) + {:ok, %Plan{status: :executed}} + """ + def mark_plan_executed(%Plan{} = plan) do + plan + |> Plan.mark_executed() + |> Repo.update() + end + + @doc """ + Marks a plan as failed with an error. + + ## Examples + + iex> mark_plan_error(plan, "Database connection failed") + {:ok, %Plan{status: :error, error: "Database connection failed"}} + """ + def mark_plan_error(%Plan{} = plan, error) do + plan + |> Plan.mark_error(error) + |> Repo.update() + end + + @doc """ + Deletes a plan and all associated changes. + + ## Examples + + iex> delete_plan(plan) + {:ok, %Plan{}} + + iex> delete_plan(plan) + {:error, %Ecto.Changeset{}} + """ + def delete_plan(%Plan{} = plan) do + Repo.delete(plan) + end + + @doc """ + Returns an `%Ecto.Changeset{}` for tracking plan changes. + + ## Examples + + iex> change_plan(plan) + %Ecto.Changeset{data: %Plan{}} + """ + def change_plan(%Plan{} = plan, attrs \\ %{}) do + Plan.changeset(plan, attrs) + end + + # ========== PlanChange Functions ========== + + @doc """ + Returns all changes for a plan. + + ## Examples + + iex> list_plan_changes(plan_id) + [%PlanChange{}, ...] + + iex> list_plan_changes(plan) + [%PlanChange{}, ...] + """ + def list_plan_changes(%Plan{id: plan_id}), do: list_plan_changes(plan_id) + + def list_plan_changes(plan_id) when is_binary(plan_id) do + from(c in PlanChange, where: c.plan_id == ^plan_id, order_by: [asc: :inserted_at]) + |> Repo.all() + end + + @doc """ + Returns changes for a plan matching the given criteria. + + ## Example Criteria + + [{:status, :pending}, {:work_id, "work-123"}] + + ## Examples + + iex> list_plan_changes(plan_id, [status: :pending]) + [%PlanChange{status: :pending}, ...] + + iex> list_plan_changes(plan, [status: :pending]) + [%PlanChange{status: :pending}, ...] + """ + def list_plan_changes(%Plan{id: plan_id}, criteria), do: list_plan_changes(plan_id, criteria) + + def list_plan_changes(plan_id, criteria) when is_binary(plan_id) do + criteria + |> Keyword.put(:plan_id, plan_id) + |> plan_change_query() + |> Repo.all() + end + + @doc """ + Returns a composable query matching the given criteria. + """ + def plan_change_query(criteria) do + query = from(PlanChange) + + Enum.reduce(criteria, query, fn + {:plan_id, plan_id}, query -> + from(c in query, where: c.plan_id == ^plan_id) + + {:work_id, work_id}, query -> + from(c in query, where: c.work_id == ^work_id) + + {:status, status}, query -> + from(c in query, where: c.status == ^status) + + {:user, user}, query -> + from(c in query, where: c.user == ^user) + + {:order, order}, query -> + from(c in query, order_by: [{^order, :inserted_at}]) + end) + end + + @doc """ + Gets a single plan change. + + Raises `Ecto.NoResultsError` if the PlanChange does not exist. + + ## Examples + + iex> get_plan_change!("123") + %PlanChange{} + """ + def get_plan_change!(id) do + Repo.get!(PlanChange, id) + end + + @doc """ + Gets a single plan change. + + Returns `nil` if the PlanChange does not exist. + + ## Examples + + iex> get_plan_change("123") + %PlanChange{} + + iex> get_plan_change("456") + nil + """ + def get_plan_change(id) do + Repo.get(PlanChange, id) + end + + def get_plan_changes_by_work(%Plan{id: plan_id}, work_id), + do: get_plan_changes_by_work(plan_id, work_id) + + def get_plan_changes_by_work(plan_id, %Work{id: work_id}), + do: get_plan_changes_by_work(plan_id, work_id) + + def get_plan_changes_by_work(plan_id, work_id) + when is_binary(plan_id) and is_binary(work_id) do + from(c in PlanChange, where: c.plan_id == ^plan_id and c.work_id == ^work_id) + |> Repo.all() + end + + @doc """ + Creates a plan change. + + ## Examples + + iex> create_plan_change(%{ + ...> plan_id: plan.id, + ...> work_id: "work-123", + ...> add: %{ + ...> descriptive_metadata: %{ + ...> subject: [ + ...> %{ + ...> role: %{id: "TOPICAL", scheme: "subject_role"}, + ...> term: %{id: "http://id.loc.gov/authorities/subjects/sh85141086"} + ...> } + ...> ] + ...> } + ...> } + ...> }) + {:ok, %PlanChange{}} + + iex> create_plan_change(%{work_id: nil}) + {:error, "can't be blank, can't be blank"} + """ + def create_plan_change(attrs \\ %{}) do + changeset = PlanChange.changeset(%PlanChange{}, attrs) + + case Repo.insert(changeset) do + {:ok, plan_change} -> + {:ok, plan_change} + + {:error, changeset} -> + error_message = + changeset + |> ChangesetErrors.humanize_errors() + |> Enum.map_join(", ", fn {_field, error} -> error end) + + {:error, error_message} + end + end + + @doc """ + Same as create_plan_change/1 but raises on error. + """ + def create_plan_change!(attrs \\ %{}) do + %PlanChange{} + |> PlanChange.changeset(attrs) + |> Repo.insert!() + end + + @doc """ + Creates multiple plan changes at once. + + ## Examples + + iex> create_plan_changes([ + ...> %{ + ...> plan_id: plan.id, + ...> work_id: "work-1", + ...> add: %{ + ...> descriptive_metadata: %{ + ...> subject: [%{role: %{id: "TOPICAL", scheme: "subject_role"}, term: %{id: "http://..."}}] + ...> } + ...> } + ...> }, + ...> %{ + ...> plan_id: plan.id, + ...> work_id: "work-2", + ...> add: %{ + ...> descriptive_metadata: %{ + ...> subject: [%{role: %{id: "TOPICAL", scheme: "subject_role"}, term: %{id: "http://..."}}] + ...> } + ...> } + ...> } + ...> ]) + {:ok, [%PlanChange{}, %PlanChange{}]} + """ + def create_plan_changes(changes_attrs) do + Repo.transaction(fn -> + Enum.map(changes_attrs, fn attrs -> + create_plan_change!(attrs) + end) + end) + end + + @doc """ + Updates a plan change. + + ## Examples + + iex> update_plan_change(change, %{notes: "Reviewed"}) + {:ok, %PlanChange{}} + + iex> update_plan_change(change, %{plan_id: "invalid-uuid"}) + {:error, "invalid-uuid is invalid"} + """ + def update_plan_change(%PlanChange{} = change, attrs) do + changeset = PlanChange.changeset(change, attrs) + + case Repo.update(changeset) do + {:ok, updated_change} -> + {:ok, updated_change} + + {:error, changeset} -> + error_message = + changeset + |> ChangesetErrors.humanize_errors() + |> Enum.map_join(", ", fn {_field, error} -> error end) + + {:error, error_message} + end + end + + @doc """ + Approves a plan change. + + ## Examples + + iex> approve_plan_change(change, "user-netid") + {:ok, %PlanChange{status: :approved}} + """ + def approve_plan_change(%PlanChange{} = change, user \\ nil) do + change + |> PlanChange.approve(user) + |> Repo.update() + end + + @doc """ + Rejects a plan change. + + ## Examples + + iex> reject_plan_change(change, "Translation incorrect") + {:ok, %PlanChange{status: :rejected}} + """ + def reject_plan_change(%PlanChange{} = change, notes \\ nil) do + change + |> PlanChange.reject(notes) + |> Repo.update() + end + + @doc """ + Marks a plan change as executed. + + ## Examples + + iex> mark_plan_change_executed(change) + {:ok, %PlanChange{status: :executed}} + """ + def mark_plan_change_executed(%PlanChange{} = change) do + change + |> PlanChange.mark_executed() + |> Repo.update() + end + + @doc """ + Marks a plan change as failed with an error. + + ## Examples + + iex> mark_plan_change_error(change, "Work not found") + {:ok, %PlanChange{status: :error}} + """ + def mark_plan_change_error(%PlanChange{} = change, error) do + change + |> PlanChange.mark_error(error) + |> Repo.update() + end + + @doc """ + Deletes a plan change. + + ## Examples + + iex> delete_plan_change(change) + {:ok, %PlanChange{}} + """ + def delete_plan_change(%PlanChange{} = change) do + Repo.delete(change) + end + + @doc """ + Returns an `%Ecto.Changeset{}` for tracking plan change modifications. + + ## Examples + + iex> change_plan_change(change) + %Ecto.Changeset{data: %PlanChange{}} + """ + def change_plan_change(%PlanChange{} = change, attrs \\ %{}) do + PlanChange.changeset(change, attrs) + end + + @doc """ + Executes a plan by applying all approved changes to their respective works. + + Returns {:ok, plan} if all changes were applied successfully. + Returns {:error, reason} if execution failed. + + ## Examples + + iex> execute_plan(plan) + {:ok, %Plan{status: :executed}} + + iex> execute_plan(plan_with_no_approved_changes) + {:error, "No approved changes to execute"} + """ + def execute_plan(%Plan{status: :approved} = plan) do + approved_changes = load_approved_changes(plan) + + approved_changes + |> validate_has_changes() + |> execute_changes_transaction(plan) + |> handle_execution_result(plan) + end + + def execute_plan(%Plan{}) do + {:error, "Plan must be approved before execution"} + end + + defp load_approved_changes(plan) do + from(c in PlanChange, + where: c.plan_id == ^plan.id and c.status == :approved, + order_by: [asc: :inserted_at] + ) + |> Repo.all() + end + + defp validate_has_changes([]), do: {:error, "No approved changes to execute"} + defp validate_has_changes(changes), do: {:ok, changes} + + defp execute_changes_transaction({:error, _} = error, _plan), do: error + + defp execute_changes_transaction({:ok, approved_changes}, plan) do + Repo.transaction( + fn -> + Enum.each(approved_changes, &execute_single_change/1) + + mark_plan_executed(plan) + |> unwrap_or_rollback() + end, + timeout: :infinity + ) + end + + defp execute_single_change(change) do + apply_change_to_work(change) + |> handle_change_result(change) + end + + defp handle_change_result({:ok, _work}, change) do + mark_plan_change_executed(change) + |> unwrap_or_rollback() + end + + defp handle_change_result({:error, reason}, _change) do + Repo.rollback(reason) + end + + defp unwrap_or_rollback({:ok, result}), do: result + defp unwrap_or_rollback({:error, reason}), do: Repo.rollback(reason) + + defp handle_execution_result({:ok, executed_plan}, _plan), do: {:ok, executed_plan} + + defp handle_execution_result({:error, "No approved changes to execute"} = error, _plan) do + error + end + + defp handle_execution_result({:error, reason}, plan) do + mark_plan_error(plan, inspect(reason)) + end + + @doc """ + Executes a single plan change by applying the changeset to the work. + + ## Examples + + iex> execute_plan_change(change) + {:ok, %PlanChange{status: :executed}} + """ + def execute_plan_change(%PlanChange{} = change) do + case apply_change_to_work(change) do + {:ok, _work} -> + mark_plan_change_executed(change) + + {:error, reason} -> + mark_plan_change_error(change, inspect(reason)) + end + end + + defp populate_plan_changes(plan, query) do + try do + query + |> normalize_query() + |> fetch_work_ids_from_query() + |> create_plan_changes_for_works(plan.id) + rescue + e -> + # Log the error but don't fail plan creation + require Logger + Logger.warning("Failed to auto-populate plan changes: #{Exception.message(e)}") + :ok + end + + plan + end + + defp normalize_query(query) do + # Try to decode as JSON first + case Jason.decode(query) do + {:ok, _} -> + # Already valid JSON + query + + {:error, _} -> + # Simple query string - convert to OpenSearch query_string query + %{ + "query" => %{ + "query_string" => %{ + "query" => query + } + } + } + |> Jason.encode!() + end + end + + defp fetch_work_ids_from_query(query) do + alias Meadow.Search.Config, as: SearchConfig + alias Meadow.Search.HTTP + + # The query should be a JSON string (like in Batches) + # Convert it to a map, ensure _source is empty, then back to JSON + query_body = + query + |> Jason.decode!() + |> Map.put("_source", "") + |> Jason.encode!() + + HTTP.post!([SearchConfig.alias_for(Work, 2), "_search?scroll=10m"], query_body) + |> Map.get(:body) + |> collect_work_ids([]) + end + + defp collect_work_ids(%{"hits" => %{"hits" => []}}, acc), do: acc + + defp collect_work_ids(%{"_scroll_id" => scroll_id, "hits" => hits}, acc) do + alias Meadow.Search.HTTP + + work_ids = + hits + |> Map.get("hits") + |> Enum.map(&Map.get(&1, "_id")) + + HTTP.post!("/_search/scroll", %{scroll: "1m", scroll_id: scroll_id}) + |> Map.get(:body) + |> collect_work_ids(acc ++ work_ids) + end + + defp create_plan_changes_for_works([], plan_id) do + require Logger + Logger.debug("No works found in OpenSearch results for plan #{plan_id}") + :ok + end + + defp create_plan_changes_for_works(work_ids, plan_id) do + require Logger + Logger.debug("Found #{length(work_ids)} works in OpenSearch results for plan #{plan_id}") + + # Validate work IDs exist in database + valid_work_ids = + from(w in Work, where: w.id in ^work_ids, select: w.id) + |> Repo.all() + + Logger.debug("Validated #{length(valid_work_ids)} work IDs exist in database") + + # Create PlanChange records with empty add/delete/replace maps + entries = + Enum.map(valid_work_ids, fn work_id -> + Logger.debug("Creating PlanChange for work #{work_id} in plan #{plan_id}") + + %{ + plan_id: plan_id, + work_id: work_id, + add: %{}, + status: :pending, + inserted_at: DateTime.utc_now(), + updated_at: DateTime.utc_now() + } + end) + + {count, _} = Repo.insert_all(PlanChange, entries) + Logger.debug("Created #{count} PlanChanges for plan #{plan_id}") + end + + defp apply_change_to_work(%PlanChange{work_id: work_id} = plan_change) do + case Repo.get(Work, work_id) do + nil -> + {:error, "Work not found"} + + work -> + apply_operations_to_work(work, plan_change) + end + end + + defp apply_operations_to_work(work, %PlanChange{delete: delete, add: add, replace: replace}) do + delete = if is_nil(delete), do: %{}, else: delete + add = if is_nil(add), do: %{}, else: add + replace = if is_nil(replace), do: %{}, else: replace + + Repo.transaction(fn -> + # Apply controlled field changes (delete/add operations) + apply_controlled_field_operations(work, delete, add) + + # Apply uncontrolled field changes (add/replace operations) + apply_uncontrolled_field_operations(work, add, replace) + + # Reload the work to get the updated state + Repo.get!(Work, work.id) + end) + end + + @controlled_fields ~w(contributor creator genre language location style_period subject technique)a + + defp apply_controlled_field_operations(work, delete, add) do + @controlled_fields + |> Enum.each(fn field -> + delete_values = controlled_field_values(delete, field) + add_values = controlled_field_values(add, field) + + unless is_nil(delete_values) and is_nil(add_values) do + apply_controlled_field_operation( + work.id, + field, + prepare_controlled_field_list(delete_values), + prepare_controlled_field_list(add_values) + ) + end + end) + end + + defp apply_controlled_field_operation(work_id, field, delete_values, add_values) do + require Logger + Logger.debug("Applying controlled field operation for #{field}") + + from(w in Work, where: w.id == ^work_id) + |> Works.replace_controlled_value( + :descriptive_metadata, + to_string(field), + delete_values, + add_values + ) + |> Repo.update_all([]) + end + + defp prepare_controlled_field_list(nil), do: [] + defp prepare_controlled_field_list([]), do: [] + + defp prepare_controlled_field_list(data) when is_list(data) do + Enum.map(data, &normalize_controlled_field_entry/1) + end + + defp prepare_controlled_field_list(data), do: prepare_controlled_field_list([data]) + + defp controlled_field_values(data, field) when is_map(data) do + metadata = Map.get(data, :descriptive_metadata) || Map.get(data, "descriptive_metadata") + + case metadata do + nil -> nil + %{} = meta -> Map.get(meta, field) || Map.get(meta, Atom.to_string(field)) + end + end + + defp controlled_field_values(_data, _field), do: nil + + defp normalize_controlled_field_entry(entry) do + entry + |> StructMap.deep_struct_to_map() + |> Atoms.atomize() + |> Map.put_new(:role, nil) + |> normalize_role() + |> normalize_term() + end + + defp normalize_role(%{role: nil} = entry), do: entry + + defp normalize_role(%{role: role} = entry) when is_map(role) do + normalized_role = + role + |> StructMap.deep_struct_to_map() + |> Map.take([:id, :scheme]) + |> Enum.reject(fn {_key, value} -> is_nil(value) end) + |> Enum.into(%{}) + + Map.put(entry, :role, if(map_size(normalized_role) == 0, do: nil, else: normalized_role)) + end + + defp normalize_role(entry), do: Map.put(entry, :role, nil) + + defp normalize_term(%{term: %{} = term} = entry) do + normalized_term = + term + |> StructMap.deep_struct_to_map() + |> Map.take([:id]) + + Map.put(entry, :term, if(map_size(normalized_term) == 0, do: nil, else: normalized_term)) + end + + defp normalize_term(entry), do: entry + + defp apply_uncontrolled_field_operations(work, add, replace) do + # Extract collection_id and top-level fields from replace + collection_id = Map.get(replace, :collection_id, :not_present) + visibility = Map.get(replace, :visibility, :not_present) + published = Map.get(replace, :published, :not_present) + + work.id + |> update_top_level_field(:collection_id, collection_id) + |> update_top_level_field(:visibility, visibility) + |> update_top_level_field(:published, published) + |> merge_uncontrolled_metadata(add, :append) + |> merge_uncontrolled_metadata(replace, :replace) + end + + defp update_top_level_field(work_id, _field, :not_present), do: work_id + + defp update_top_level_field(work_id, field, value) do + require Logger + Logger.debug("Updating #{field} to #{inspect(value)}") + + update_args = Keyword.new([{field, value}, {:updated_at, DateTime.utc_now()}]) + + from(w in Work, where: w.id == ^work_id, update: [set: ^update_args]) + |> Repo.update_all([]) + + work_id + end + + defp merge_uncontrolled_metadata(work_id, new_values, _mode) when map_size(new_values) == 0 do + work_id + end + + defp merge_uncontrolled_metadata(work_id, new_values, mode) do + descriptive_metadata = + new_values + |> metadata_section(:descriptive_metadata) + |> Atoms.atomize() + + mergeable_descriptive_metadata = + descriptive_metadata + |> Enum.filter(fn {key, _} -> key not in @controlled_fields end) + |> Enum.into(%{}) + |> humanize_date_created() + + mergeable_administrative_metadata = + new_values + |> metadata_section(:administrative_metadata) + |> Atoms.atomize() + |> Enum.into(%{}) + + if map_size(mergeable_descriptive_metadata) + map_size(mergeable_administrative_metadata) > 0 do + from(w in Work, where: w.id == ^work_id) + |> Works.merge_metadata_values( + :descriptive_metadata, + mergeable_descriptive_metadata, + mode + ) + |> Works.merge_metadata_values( + :administrative_metadata, + mergeable_administrative_metadata, + mode + ) + |> Works.merge_updated_at() + |> Repo.update_all([]) + end + + work_id + end + + defp metadata_section(map, key) when is_map(map) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) || %{} + end + + defp metadata_section(_map, _key), do: %{} + + defp humanize_date_created(descriptive_metadata) do + case Map.fetch(descriptive_metadata, :date_created) do + {:ok, date_created} -> + put_humanized_dates(descriptive_metadata, :date_created, date_created) + + :error -> + case Map.fetch(descriptive_metadata, "date_created") do + {:ok, date_created} -> + put_humanized_dates(descriptive_metadata, "date_created", date_created) + + :error -> + descriptive_metadata + end + end + end + + defp put_humanized_dates(descriptive_metadata, key, date_created) do + dates = + date_created + |> Enum.map(&coerce_date_entry/1) + |> Enum.reject(&is_nil/1) + + Map.put(descriptive_metadata, key, dates) + end + + defp humanize_edtf(nil), do: nil + + defp humanize_edtf(edtf) when is_binary(edtf) do + case EDTF.humanize(edtf) do + {:error, error} -> raise error + result -> %{edtf: edtf, humanized: result} + end + end + + defp coerce_date_entry(entry) when is_binary(entry), do: humanize_edtf(entry) + + defp coerce_date_entry(entry) when is_map(entry) do + entry + |> Map.new(fn {key, value} -> {Atoms.atomize(key), value} end) + |> normalize_date_entry() + end + + defp coerce_date_entry(_entry), do: humanize_edtf(nil) + + defp normalize_date_entry(%{edtf: nil}), do: humanize_edtf(nil) + + defp normalize_date_entry(%{edtf: edtf, humanized: humanized}) + when not is_nil(edtf) and not is_nil(humanized) do + %{edtf: edtf, humanized: humanized} + end + + defp normalize_date_entry(%{edtf: edtf}) when not is_nil(edtf), do: humanize_edtf(edtf) + + defp normalize_date_entry(_entry), do: humanize_edtf(nil) +end diff --git a/app/lib/meadow/data/schemas/agent_plan.ex b/app/lib/meadow/data/schemas/agent_plan.ex new file mode 100644 index 000000000..8f1be4457 --- /dev/null +++ b/app/lib/meadow/data/schemas/agent_plan.ex @@ -0,0 +1,80 @@ +defmodule Meadow.Data.Schemas.AgentPlan do + @moduledoc """ + AgentPlan stores structured plans created by AI agents for modifying Meadow data. + """ + use Ecto.Schema + import Ecto.Changeset + + @statuses [:pending, :approved, :rejected, :executed, :error] + + @primary_key {:id, Ecto.UUID, autogenerate: false, read_after_writes: true} + @timestamps_opts [type: :utc_datetime_usec] + schema "agent_plans" do + field :query, :string + field :changeset, :map + field :status, Ecto.Enum, values: @statuses, default: :pending + field :user, :string + field :notes, :string + field :executed_at, :utc_datetime_usec + field :error, :string + timestamps() + end + + @doc false + def changeset(agent_plan, attrs) do + agent_plan + |> cast(attrs, [:query, :changeset, :status, :user, :notes, :executed_at, :error]) + |> validate_required([:query, :changeset]) + |> validate_inclusion(:status, @statuses) + |> validate_changeset_format() + end + + @doc """ + Transition plan to approved status + """ + def approve(agent_plan, user \\ nil) do + agent_plan + |> cast(%{status: :approved, user: user}, [:status, :user]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Transition plan to rejected status + """ + def reject(agent_plan, notes \\ nil) do + agent_plan + |> cast(%{status: :rejected, notes: notes}, [:status, :notes]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Mark plan as executed + """ + def mark_executed(agent_plan) do + agent_plan + |> cast(%{status: :executed, executed_at: DateTime.utc_now()}, [:status, :executed_at]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Mark plan as failed with error + """ + def mark_error(agent_plan, error) do + agent_plan + |> cast(%{status: :error, error: error}, [:status, :error]) + |> validate_inclusion(:status, @statuses) + end + + defp validate_changeset_format(changeset) do + case get_field(changeset, :changeset) do + nil -> + changeset + + cs when is_map(cs) -> + changeset + + _ -> + add_error(changeset, :changeset, "must be a map/object") + end + end +end diff --git a/app/lib/meadow/data/schemas/controlled_metadata_entry.ex b/app/lib/meadow/data/schemas/controlled_metadata_entry.ex index 083334bbd..5bf769cdd 100644 --- a/app/lib/meadow/data/schemas/controlled_metadata_entry.ex +++ b/app/lib/meadow/data/schemas/controlled_metadata_entry.ex @@ -3,6 +3,7 @@ defmodule Meadow.Data.Schemas.ControlledMetadataEntry do Schema for Controlled Entry with Role qualifier """ + @derive {Jason.Encoder, only: [:role, :term]} import Ecto.Changeset use Ecto.Schema alias Meadow.Data.Types diff --git a/app/lib/meadow/data/schemas/plan.ex b/app/lib/meadow/data/schemas/plan.ex new file mode 100644 index 000000000..369f5e95d --- /dev/null +++ b/app/lib/meadow/data/schemas/plan.ex @@ -0,0 +1,123 @@ +defmodule Meadow.Data.Schemas.Plan do + @moduledoc """ + Plan stores high-level AI agent tasks for modifying Meadow data. + + A Plan represents the overall task given to an AI agent (e.g., "Translate titles to Spanish" + or "Look up and assign LCNAF contributors"), while individual work-specific changes are stored + in associated PlanChange records. + + ## Example Workflow + + 1. User provides a prompt: "Translate the titles to Spanish in the alternate_title field" + 2. Agent processes each work, using tools (translation APIs, etc.) to generate specific changes + 3. System creates: + - One Plan record with the prompt and work selection criteria + - Multiple PlanChange records, one per work, with work-specific modifications + + ## Fields + + - `prompt` - The natural language instruction given to the agent + Example: "Add a date_created EDTF string for the work based on the work's existing description, creator, and temporal subjects" + + - `query` - OpenSearch query string identifying works + Example: `"collection.id:abc-123"` or `"id:(73293ebf-288b-4d4f-8843-488391796fea OR 2a27f163-c7fd-437c-8d4d-c2dbce72c884)"` + + - `status` - Current state of the plan: + - `:pending` - Plan created, awaiting review + - `:approved` - Plan approved for execution + - `:rejected` - Plan rejected, will not be executed + - `:executed` - All approved changes have been applied + - `:error` - Execution failed + + - `user` - User who approved/executed the plan + - `notes` - Optional notes about approval/rejection + - `executed_at` - When the plan was executed + - `error` - Error message if execution failed + """ + use Ecto.Schema + import Ecto.Changeset + + alias Meadow.Data.Schemas.PlanChange + + @statuses [:pending, :approved, :rejected, :executed, :error] + + @primary_key {:id, Ecto.UUID, autogenerate: false, read_after_writes: true} + @timestamps_opts [type: :utc_datetime_usec] + schema "plans" do + field :prompt, :string + field :query, :string + field :status, Ecto.Enum, values: @statuses, default: :pending + field :user, :string + field :notes, :string + field :executed_at, :utc_datetime_usec + field :error, :string + + has_many :plan_changes, PlanChange, foreign_key: :plan_id + + timestamps() + end + + @doc false + def changeset(plan, attrs) do + plan + |> cast(attrs, [:prompt, :query, :status, :user, :notes, :executed_at, :error]) + |> validate_required([:prompt]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Transition plan to approved status + + ## Example + + iex> plan |> Plan.approve("user@example.com") |> Repo.update() + {:ok, %Plan{status: :approved, user: "user@example.com"}} + """ + def approve(plan, user \\ nil) do + plan + |> cast(%{status: :approved, user: user}, [:status, :user]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Transition plan to rejected status + + ## Example + + iex> plan |> Plan.reject("Changes not needed") |> Repo.update() + {:ok, %Plan{status: :rejected, notes: "Changes not needed"}} + """ + def reject(plan, notes \\ nil) do + plan + |> cast(%{status: :rejected, notes: notes}, [:status, :notes]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Mark plan as executed + + ## Example + + iex> plan |> Plan.mark_executed() |> Repo.update() + {:ok, %Plan{status: :executed, executed_at: ~U[2025-10-01 12:00:00.000000Z]}} + """ + def mark_executed(plan) do + plan + |> cast(%{status: :executed, executed_at: DateTime.utc_now()}, [:status, :executed_at]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Mark plan as failed with error + + ## Example + + iex> plan |> Plan.mark_error("Database connection failed") |> Repo.update() + {:ok, %Plan{status: :error, error: "Database connection failed"}} + """ + def mark_error(plan, error) do + plan + |> cast(%{status: :error, error: error}, [:status, :error]) + |> validate_inclusion(:status, @statuses) + end +end diff --git a/app/lib/meadow/data/schemas/plan_change.ex b/app/lib/meadow/data/schemas/plan_change.ex new file mode 100644 index 000000000..e3c6fb1e4 --- /dev/null +++ b/app/lib/meadow/data/schemas/plan_change.ex @@ -0,0 +1,174 @@ +defmodule Meadow.Data.Schemas.PlanChange do + @moduledoc """ + PlanChange stores individual work-specific modifications proposed by an AI agent. + + Each PlanChange represents a single work's proposed modifications as part of a larger Plan. + The agent examines each work in context and generates tailored changes. + + ## Example Scenarios + + ### Scenario 1: Adding EDTF Date Strings + Plan prompt: "Add a date_created EDTF string for the work based on the work's existing description, creator, and temporal subjects" + + Generated PlanChanges: + - Work A (description mentions "November 10, 1896"): + `add: %{descriptive_metadata: %{date_created: ["1896-11-10"]}}` + - Work B (temporal subject shows "1920s"): + `add: %{descriptive_metadata: %{date_created: ["192X"]}}` + + ### Scenario 2: Looking Up and Assigning Contributors + Plan prompt: "Look up LCNAF names from description and assign as contributors with MARC relators" + + Generated PlanChanges: + - Work A (description mentions "photographed by Ansel Adams"): + `add: %{descriptive_metadata: %{contributor: [%{term: "Adams, Ansel, 1902-1984", role: %{id: "pht", scheme: "marc_relator"}}]}}` + - Work B (description mentions "interviewed by Studs Terkel"): + `add: %{descriptive_metadata: %{contributor: [%{term: "Terkel, Studs, 1912-2008", role: %{id: "ivr", scheme: "marc_relator"}}]}}` + + ### Scenario 3: Removing Extraneous Subject Headings + Plan prompt: "Remove extraneous subject headings like 'Photograph' and 'Image'" + + Generated PlanChanges: + - Work A has generic subjects to remove: + `delete: %{descriptive_metadata: %{subject: [%{term: "Photograph"}, %{term: "Image"}]}}` + + ## Fields + + - `plan_id` - Foreign key to the parent Plan + - `work_id` - The specific work being modified + - `add` - Map of values to append to existing work data + - `delete` - Map of values to remove from existing work data + - `replace` - Map of values to fully replace in work data + + - `status` - Current state of this change: + - `:pending` - Change proposed, awaiting review + - `:approved` - Change approved for execution + - `:rejected` - Change rejected, will not be executed + - `:executed` - Change has been applied to the work + - `:error` - Execution failed for this change + + - `user` - User who approved/rejected this specific change + - `notes` - Optional notes about this change (e.g., reason for rejection) + - `executed_at` - When this change was applied + - `error` - Error message if this change failed to apply + """ + use Ecto.Schema + import Ecto.Changeset + + alias Meadow.Data.Schemas.Plan + + @statuses [:pending, :approved, :rejected, :executed, :error] + + @primary_key {:id, Ecto.UUID, autogenerate: false, read_after_writes: true} + @timestamps_opts [type: :utc_datetime_usec] + schema "plan_changes" do + field :plan_id, Ecto.UUID + field :work_id, Ecto.UUID + field :add, :map + field :delete, :map + field :replace, :map + field :status, Ecto.Enum, values: @statuses, default: :pending + field :user, :string + field :notes, :string + field :executed_at, :utc_datetime_usec + field :error, :string + + belongs_to :plan, Plan, foreign_key: :plan_id, references: :id, define_field: false + + timestamps() + end + + @doc false + def changeset(plan_change, attrs) do + plan_change + |> cast(attrs, [:plan_id, :work_id, :add, :delete, :replace, :status, :user, :notes, :executed_at, :error]) + |> validate_required([:work_id]) + |> validate_at_least_one_operation() + |> validate_inclusion(:status, @statuses) + |> validate_map_format(:add) + |> validate_map_format(:delete) + |> validate_map_format(:replace) + |> foreign_key_constraint(:plan_id) + end + + @doc """ + Transition change to approved status + + ## Example + + iex> change |> PlanChange.approve("user@example.com") |> Repo.update() + {:ok, %PlanChange{status: :approved, user: "user@example.com"}} + """ + def approve(plan_change, user \\ nil) do + plan_change + |> cast(%{status: :approved, user: user}, [:status, :user]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Transition change to rejected status + + ## Example + + iex> change |> PlanChange.reject("Translation is incorrect") |> Repo.update() + {:ok, %PlanChange{status: :rejected, notes: "Translation is incorrect"}} + """ + def reject(plan_change, notes \\ nil) do + plan_change + |> cast(%{status: :rejected, notes: notes}, [:status, :notes]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Mark change as executed + + ## Example + + iex> change |> PlanChange.mark_executed() |> Repo.update() + {:ok, %PlanChange{status: :executed, executed_at: ~U[2025-10-01 12:00:00.000000Z]}} + """ + def mark_executed(plan_change) do + plan_change + |> cast(%{status: :executed, executed_at: DateTime.utc_now()}, [:status, :executed_at]) + |> validate_inclusion(:status, @statuses) + end + + @doc """ + Mark change as failed with error + + ## Example + + iex> change |> PlanChange.mark_error("Work not found") |> Repo.update() + {:ok, %PlanChange{status: :error, error: "Work not found"}} + """ + def mark_error(plan_change, error) do + plan_change + |> cast(%{status: :error, error: error}, [:status, :error]) + |> validate_inclusion(:status, @statuses) + end + + defp validate_at_least_one_operation(changeset) do + add = get_field(changeset, :add) + delete = get_field(changeset, :delete) + replace = get_field(changeset, :replace) + + if is_nil(add) and is_nil(delete) and is_nil(replace) do + add_error(changeset, :add, "at least one of add, delete, or replace must be specified") + else + changeset + end + end + + defp validate_map_format(changeset, field) do + case get_field(changeset, field) do + nil -> + changeset + + value when is_map(value) -> + changeset + + _ -> + add_error(changeset, field, "must be a map/object") + end + end +end diff --git a/app/priv/repo/migrations/20251001031320_create_agent_plans.exs b/app/priv/repo/migrations/20251001031320_create_agent_plans.exs new file mode 100644 index 000000000..7bcb6ece4 --- /dev/null +++ b/app/priv/repo/migrations/20251001031320_create_agent_plans.exs @@ -0,0 +1,20 @@ +defmodule Meadow.Repo.Migrations.CreateAgentPlans do + use Ecto.Migration + + def change do + create table(:agent_plans) do + add(:query, :text, null: false) + add(:changeset, :jsonb, null: false) + add(:status, :string, null: false, default: "pending") + add(:user, :string) + add(:notes, :text) + add(:executed_at, :utc_datetime_usec) + add(:error, :text) + timestamps() + end + + create(index(:agent_plans, [:status])) + create(index(:agent_plans, [:user])) + create(index(:agent_plans, [:inserted_at])) + end +end diff --git a/app/priv/repo/migrations/20251001120000_create_plans_and_plan_changes.exs b/app/priv/repo/migrations/20251001120000_create_plans_and_plan_changes.exs new file mode 100644 index 000000000..41b7dfed0 --- /dev/null +++ b/app/priv/repo/migrations/20251001120000_create_plans_and_plan_changes.exs @@ -0,0 +1,43 @@ +defmodule Meadow.Repo.Migrations.CreatePlansAndPlanChanges do + use Ecto.Migration + + def change do + create table(:plans, primary_key: false) do + add :id, :uuid, primary_key: true, default: fragment("gen_random_uuid()") + add :prompt, :text, null: false + add :query, :text + add :status, :string, null: false, default: "pending" + add :user, :string + add :notes, :text + add :executed_at, :utc_datetime_usec + add :error, :text + timestamps(type: :utc_datetime_usec) + end + + create table(:plan_changes, primary_key: false) do + add :id, :uuid, primary_key: true, default: fragment("gen_random_uuid()") + add :plan_id, references(:plans, type: :uuid, on_delete: :delete_all), null: false + add :work_id, :uuid, null: false + add :changeset, :jsonb + add :add, :jsonb + add :delete, :jsonb + add :replace, :jsonb + add :status, :string, null: false, default: "pending" + add :user, :string + add :notes, :text + add :executed_at, :utc_datetime_usec + add :error, :text + timestamps(type: :utc_datetime_usec) + end + + create index(:plans, [:status]) + create index(:plans, [:user]) + create index(:plans, [:inserted_at]) + + create index(:plan_changes, [:plan_id]) + create index(:plan_changes, [:work_id]) + create index(:plan_changes, [:status]) + create index(:plan_changes, [:user]) + create index(:plan_changes, [:inserted_at]) + end +end diff --git a/app/test/meadow/data/planner_test.exs b/app/test/meadow/data/planner_test.exs new file mode 100644 index 000000000..4f73a09bb --- /dev/null +++ b/app/test/meadow/data/planner_test.exs @@ -0,0 +1,834 @@ +defmodule Meadow.Data.PlannerTest do + use Meadow.AuthorityCase + use Meadow.DataCase + + alias Meadow.Data.Planner + alias Meadow.Data.Schemas.{Plan, PlanChange} + + @valid_plan_attrs %{ + prompt: "Translate titles to Spanish in alternate_title field", + query: "collection.id:abc-123" + } + + @invalid_plan_attrs %{prompt: nil} + + setup do + {:ok, plan} = Planner.create_plan(@valid_plan_attrs) + work = work_fixture() + {:ok, plan: plan, work: work} + end + + describe "list_plans/0" do + test "returns all plans" do + assert length(Planner.list_plans()) >= 1 + end + + test "returns empty list when no plans exist" do + Repo.delete_all(Plan) + assert Planner.list_plans() == [] + end + end + + describe "list_plans/1" do + test "filters plans by status" do + {:ok, pending_plan} = Planner.create_plan(@valid_plan_attrs) + {:ok, approved_plan} = Planner.create_plan(@valid_plan_attrs) + Planner.approve_plan(approved_plan) + + pending_plans = Planner.list_plans(status: :pending) + pending_ids = Enum.map(pending_plans, & &1.id) + assert pending_plan.id in pending_ids + refute approved_plan.id in pending_ids + end + + test "limits results" do + Planner.create_plan(@valid_plan_attrs) + Planner.create_plan(@valid_plan_attrs) + Planner.create_plan(@valid_plan_attrs) + + plans = Planner.list_plans(limit: 2) + assert length(plans) == 2 + end + + test "filters by user" do + attrs_with_user = Map.put(@valid_plan_attrs, :user, "test_user@example.com") + {:ok, user_plan} = Planner.create_plan(attrs_with_user) + {:ok, _other_plan} = Planner.create_plan(@valid_plan_attrs) + + user_plans = Planner.list_plans(user: "test_user@example.com") + assert length(user_plans) == 1 + assert hd(user_plans).id == user_plan.id + end + + test "orders results" do + # Clear existing plans to ensure clean test + Repo.delete_all(Plan) + + {:ok, plan1} = Planner.create_plan(@valid_plan_attrs) + {:ok, plan2} = Planner.create_plan(@valid_plan_attrs) + + plans_asc = Planner.list_plans(order: :asc) + assert hd(plans_asc).id == plan1.id + + plans_desc = Planner.list_plans(order: :desc) + assert hd(plans_desc).id == plan2.id + end + end + + describe "get_plan!/2" do + test "returns the plan with given id", %{plan: plan} do + assert %Plan{} = retrieved_plan = Planner.get_plan!(plan.id) + assert retrieved_plan.id == plan.id + end + + test "raises if plan doesn't exist" do + assert_raise Ecto.NoResultsError, fn -> + Planner.get_plan!(Ecto.UUID.generate()) + end + end + + test "preloads changes when requested", %{plan: plan, work: work} do + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work.id, + add: %{descriptive_metadata: %{title: "Updated"}} + }) + + retrieved_plan = Planner.get_plan!(plan.id, preload_changes: true) + assert length(retrieved_plan.plan_changes) == 1 + end + end + + describe "get_plan/2" do + test "returns the plan with given id", %{plan: plan} do + assert %Plan{} = retrieved_plan = Planner.get_plan(plan.id) + assert retrieved_plan.id == plan.id + end + + test "returns nil if plan doesn't exist" do + assert Planner.get_plan(Ecto.UUID.generate()) == nil + end + end + + describe "get_pending_plans/1" do + test "returns only pending plans" do + {:ok, pending1} = Planner.create_plan(@valid_plan_attrs) + {:ok, pending2} = Planner.create_plan(@valid_plan_attrs) + {:ok, approved} = Planner.create_plan(@valid_plan_attrs) + Planner.approve_plan(approved) + + pending_plans = Planner.get_pending_plans() + pending_ids = Enum.map(pending_plans, & &1.id) + assert pending1.id in pending_ids + assert pending2.id in pending_ids + refute approved.id in pending_ids + end + + test "returns empty list when no pending plans" do + Repo.delete_all(Plan) + {:ok, plan} = Planner.create_plan(@valid_plan_attrs) + Planner.approve_plan(plan) + + assert Planner.get_pending_plans() == [] + end + end + + describe "get_approved_plans/1" do + test "returns only approved plans" do + {:ok, pending} = Planner.create_plan(@valid_plan_attrs) + {:ok, approved1} = Planner.create_plan(@valid_plan_attrs) + {:ok, approved2} = Planner.create_plan(@valid_plan_attrs) + Planner.approve_plan(approved1) + Planner.approve_plan(approved2) + + approved_plans = Planner.get_approved_plans() + approved_ids = Enum.map(approved_plans, & &1.id) + assert approved1.id in approved_ids + assert approved2.id in approved_ids + refute pending.id in approved_ids + end + + test "returns empty list when no approved plans" do + Repo.delete_all(Plan) + Planner.create_plan(@valid_plan_attrs) + + assert Planner.get_approved_plans() == [] + end + end + + describe "create_plan/1" do + test "with valid data creates a plan" do + assert {:ok, %Plan{} = plan} = Planner.create_plan(@valid_plan_attrs) + assert plan.prompt == @valid_plan_attrs.prompt + assert plan.query == @valid_plan_attrs.query + assert plan.status == :pending + end + + test "with invalid data returns error changeset" do + assert {:error, %Ecto.Changeset{}} = Planner.create_plan(@invalid_plan_attrs) + end + + test "with user creates plan with user field" do + attrs = Map.put(@valid_plan_attrs, :user, "test_user@example.com") + assert {:ok, %Plan{} = plan} = Planner.create_plan(attrs) + assert plan.user == "test_user@example.com" + end + end + + describe "create_plan!/1" do + test "with valid data creates a plan" do + assert %Plan{} = plan = Planner.create_plan!(@valid_plan_attrs) + assert plan.prompt == @valid_plan_attrs.prompt + end + + test "with invalid data raises error" do + assert_raise Ecto.InvalidChangesetError, fn -> + Planner.create_plan!(@invalid_plan_attrs) + end + end + end + + describe "update_plan/2" do + test "with valid data updates the plan", %{plan: plan} do + update_attrs = %{notes: "Updated notes"} + + assert {:ok, %Plan{} = updated_plan} = Planner.update_plan(plan, update_attrs) + assert updated_plan.notes == "Updated notes" + end + + test "with invalid data returns error changeset", %{plan: plan} do + assert {:error, %Ecto.Changeset{}} = Planner.update_plan(plan, %{status: :invalid}) + end + end + + describe "approve_plan/2" do + test "transitions plan to approved status", %{plan: plan} do + assert {:ok, %Plan{} = approved_plan} = Planner.approve_plan(plan) + assert approved_plan.status == :approved + end + + test "associates user with approval", %{plan: plan} do + assert {:ok, %Plan{} = approved_plan} = Planner.approve_plan(plan, "user@example.com") + assert approved_plan.status == :approved + assert approved_plan.user == "user@example.com" + end + end + + describe "reject_plan/2" do + test "transitions plan to rejected status", %{plan: plan} do + assert {:ok, %Plan{} = rejected_plan} = Planner.reject_plan(plan) + assert rejected_plan.status == :rejected + end + + test "adds notes to rejection", %{plan: plan} do + assert {:ok, %Plan{} = rejected_plan} = Planner.reject_plan(plan, "Not appropriate") + assert rejected_plan.status == :rejected + assert rejected_plan.notes == "Not appropriate" + end + end + + describe "mark_plan_executed/1" do + test "transitions plan to executed status with timestamp", %{plan: plan} do + assert {:ok, %Plan{} = executed_plan} = Planner.mark_plan_executed(plan) + assert executed_plan.status == :executed + assert executed_plan.executed_at != nil + end + end + + describe "mark_plan_error/2" do + test "transitions plan to error status with error message", %{plan: plan} do + error_message = "Failed to execute plan" + + assert {:ok, %Plan{} = error_plan} = Planner.mark_plan_error(plan, error_message) + assert error_plan.status == :error + assert error_plan.error == error_message + end + end + + describe "delete_plan/1" do + test "deletes the plan", %{plan: plan} do + assert {:ok, %Plan{}} = Planner.delete_plan(plan) + assert_raise Ecto.NoResultsError, fn -> Planner.get_plan!(plan.id) end + end + + test "deletes associated changes", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work.id, + add: %{descriptive_metadata: %{title: "Updated"}} + }) + + Planner.delete_plan(plan) + assert Planner.get_plan_change(change.id) == nil + end + end + + describe "change_plan/2" do + test "returns a plan changeset", %{plan: plan} do + assert %Ecto.Changeset{} = Planner.change_plan(plan) + end + + test "returns changeset with changes", %{plan: plan} do + changeset = Planner.change_plan(plan, %{notes: "New notes"}) + assert changeset.changes.notes == "New notes" + end + end + + # ========== PlanChange Tests ========== + + describe "list_plan_changes/1" do + test "returns all changes for a plan", %{plan: plan, work: work} do + {:ok, _} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + {:ok, _} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + changes = Planner.list_plan_changes(plan.id) + assert length(changes) == 2 + end + + test "returns empty list when no changes", %{plan: plan} do + assert Planner.list_plan_changes(plan.id) == [] + end + end + + describe "list_plan_changes/2" do + test "filters by status", %{plan: plan, work: work} do + {:ok, pending} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + {:ok, approved} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + Planner.approve_plan_change(approved) + + pending_changes = Planner.list_plan_changes(plan.id, status: :pending) + assert length(pending_changes) == 1 + assert hd(pending_changes).id == pending.id + end + + test "filters by work_id", %{plan: plan} do + work1 = work_fixture() + work2 = work_fixture() + + {:ok, change1} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work1.id, add: %{}}) + + {:ok, _change2} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work2.id, add: %{}}) + + work1_changes = Planner.list_plan_changes(plan.id, work_id: work1.id) + assert length(work1_changes) == 1 + assert hd(work1_changes).id == change1.id + end + end + + describe "get_plan_change!/1" do + test "returns the change", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert %PlanChange{} = retrieved = Planner.get_plan_change!(change.id) + assert retrieved.id == change.id + end + + test "raises if change doesn't exist" do + assert_raise Ecto.NoResultsError, fn -> + Planner.get_plan_change!(Ecto.UUID.generate()) + end + end + end + + describe "get_plan_change/1" do + test "returns the change", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert %PlanChange{} = retrieved = Planner.get_plan_change(change.id) + assert retrieved.id == change.id + end + + test "returns nil if change doesn't exist" do + assert Planner.get_plan_change(Ecto.UUID.generate()) == nil + end + end + + describe "create_plan_change/1" do + test "with valid add data creates a change", %{plan: plan, work: work} do + attrs = %{ + plan_id: plan.id, + work_id: work.id, + add: %{descriptive_metadata: %{alternate_title: ["El Gato"]}} + } + + assert {:ok, %PlanChange{} = change} = Planner.create_plan_change(attrs) + assert change.work_id == work.id + assert change.add.descriptive_metadata.alternate_title == ["El Gato"] + end + + test "with valid delete data creates a change", %{plan: plan, work: work} do + attrs = %{ + plan_id: plan.id, + work_id: work.id, + delete: %{ + descriptive_metadata: %{ + subject: [ + %{ + role: %{id: "TOPICAL", scheme: "subject_role"}, + term: %{id: "http://id.loc.gov/authorities/subjects/sh85101196"} + } + ] + } + } + } + + assert {:ok, %PlanChange{} = change} = Planner.create_plan_change(attrs) + assert change.work_id == work.id + assert [subj] = change.delete.descriptive_metadata.subject + assert subj.role.id == "TOPICAL" + assert subj.term.id == "http://id.loc.gov/authorities/subjects/sh85101196" + end + + test "with valid replace data creates a change", %{plan: plan, work: work} do + attrs = %{ + plan_id: plan.id, + work_id: work.id, + replace: %{descriptive_metadata: %{title: "New Title"}} + } + + assert {:ok, %PlanChange{} = change} = Planner.create_plan_change(attrs) + assert change.work_id == work.id + assert change.replace.descriptive_metadata.title == "New Title" + end + + test "without required work_id returns error", %{plan: plan} do + attrs = %{plan_id: plan.id, add: %{}} + assert {:error, error_message} = Planner.create_plan_change(attrs) + assert error_message =~ "can't be blank" + end + + test "without any operation returns error", %{plan: plan, work: work} do + attrs = %{plan_id: plan.id, work_id: work.id} + assert {:error, error_message} = Planner.create_plan_change(attrs) + assert error_message =~ "at least one of add, delete, or replace must be specified" + end + + test "with invalid plan_id returns humanized error", %{work: work} do + invalid_plan_id = "nonexistent-plan-id" + + attrs = %{ + plan_id: invalid_plan_id, + work_id: work.id, + add: %{descriptive_metadata: %{alternate_title: ["El Gato"]}} + } + + assert {:error, error_message} = Planner.create_plan_change(attrs) + assert error_message == "#{invalid_plan_id} is invalid" + end + end + + describe "create_plan_changes/1" do + test "creates multiple changes at once", %{plan: plan} do + work1 = work_fixture() + work2 = work_fixture() + + changes_attrs = [ + %{plan_id: plan.id, work_id: work1.id, add: %{descriptive_metadata: %{title: "Updated 1"}}}, + %{plan_id: plan.id, work_id: work2.id, add: %{descriptive_metadata: %{title: "Updated 2"}}} + ] + + assert {:ok, changes} = Planner.create_plan_changes(changes_attrs) + assert length(changes) == 2 + end + + test "rolls back on error", %{plan: plan} do + work1 = work_fixture() + + changes_attrs = [ + %{plan_id: plan.id, work_id: work1.id, add: %{descriptive_metadata: %{title: "Updated"}}}, + %{plan_id: plan.id, work_id: nil, add: %{}} + ] + + # The transaction will raise an error and rollback + assert_raise Ecto.InvalidChangesetError, fn -> + Planner.create_plan_changes(changes_attrs) + end + + # Verify no changes were persisted + assert Planner.list_plan_changes(plan.id) == [] + end + end + + describe "update_plan_change/2" do + test "updates the change", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert {:ok, updated} = Planner.update_plan_change(change, %{notes: "Reviewed"}) + assert updated.notes == "Reviewed" + end + + test "with invalid plan_id returns humanized error", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + invalid_plan_id = "invalid-uuid" + + assert {:error, error_message} = + Planner.update_plan_change(change, %{plan_id: invalid_plan_id}) + + assert error_message == "#{invalid_plan_id} is invalid" + end + end + + describe "approve_plan_change/2" do + test "approves the change", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert {:ok, approved} = Planner.approve_plan_change(change, "user@example.com") + assert approved.status == :approved + assert approved.user == "user@example.com" + end + end + + describe "reject_plan_change/2" do + test "rejects the change", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert {:ok, rejected} = Planner.reject_plan_change(change, "Not accurate") + assert rejected.status == :rejected + assert rejected.notes == "Not accurate" + end + end + + describe "mark_plan_change_executed/1" do + test "marks change as executed", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert {:ok, executed} = Planner.mark_plan_change_executed(change) + assert executed.status == :executed + assert executed.executed_at + end + end + + describe "mark_plan_change_error/2" do + test "marks change as error", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert {:ok, error_change} = Planner.mark_plan_change_error(change, "Work not found") + assert error_change.status == :error + assert error_change.error == "Work not found" + end + end + + describe "delete_plan_change/1" do + test "deletes the change", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + + assert {:ok, %PlanChange{}} = Planner.delete_plan_change(change) + assert Planner.get_plan_change(change.id) == nil + end + end + + describe "execute_plan/1" do + test "executes all approved changes with replace operation", %{plan: plan} do + work1 = work_fixture() + work2 = work_fixture() + + {:ok, change1} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work1.id, + replace: %{descriptive_metadata: %{title: "Updated 1"}} + }) + + {:ok, change2} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work2.id, + replace: %{descriptive_metadata: %{title: "Updated 2"}} + }) + + Planner.approve_plan_change(change1) + Planner.approve_plan_change(change2) + {:ok, plan} = Planner.approve_plan(plan) + + assert {:ok, executed_plan} = Planner.execute_plan(plan) + assert executed_plan.status == :executed + + # Verify the plan changes were marked as executed + assert Planner.list_plan_changes(plan.id, status: :executed) |> length() == 2 + end + + test "returns error when plan is not approved", %{plan: plan, work: work} do + {:ok, change} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work.id, + replace: %{descriptive_metadata: %{title: "Updated"}} + }) + + Planner.approve_plan_change(change) + + # Plan is still pending, even though changes are approved + assert {:error, "Plan must be approved before execution"} = Planner.execute_plan(plan) + end + + test "returns error when no approved changes", %{plan: plan, work: work} do + Planner.create_plan_change(%{plan_id: plan.id, work_id: work.id, add: %{}}) + {:ok, plan} = Planner.approve_plan(plan) + + assert {:error, "No approved changes to execute"} = Planner.execute_plan(plan) + end + end + + describe "execute_plan_change/1" do + test "applies replace change to work", %{plan: plan} do + work = work_fixture() + + {:ok, change} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work.id, + replace: %{descriptive_metadata: %{title: "New Title"}}, + status: :approved + }) + + assert {:ok, executed_change} = Planner.execute_plan_change(change) + assert executed_change.status == :executed + + updated_work = Repo.get!(Meadow.Data.Schemas.Work, work.id) + assert updated_work.descriptive_metadata.title == "New Title" + end + + test "applies delete change to controlled field", %{plan: plan} do + # Create a work with subjects + work = + work_fixture(%{ + descriptive_metadata: %{ + title: "Test Work", + subject: [ + %{term: "mock1:result1", role: %{id: "TOPICAL", scheme: "subject_role"}}, + %{term: "mock1:result2", role: %{id: "GEOGRAPHICAL", scheme: "subject_role"}} + ] + } + }) + + # Reload to get the full structure as saved in DB + work = Repo.get!(Meadow.Data.Schemas.Work, work.id) + [subj_to_delete | _] = work.descriptive_metadata.subject + + # Create a plan change to delete one subject + # Convert the struct to match what's stored in JSONB by round-tripping through JSON + subj_as_map = + subj_to_delete + |> Jason.encode!() + |> Jason.decode!(keys: :atoms) + + {:ok, change} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work.id, + delete: %{ + descriptive_metadata: %{ + subject: [subj_as_map] + } + }, + status: :approved + }) + + # Execute the change + assert {:ok, executed_change} = Planner.execute_plan_change(change) + assert executed_change.status == :executed + + # Verify the subject was deleted + updated_work = Repo.get!(Meadow.Data.Schemas.Work, work.id) + + assert length(updated_work.descriptive_metadata.subject) == 1 + + [remaining_subject] = updated_work.descriptive_metadata.subject + assert remaining_subject.term.id == "mock1:result2" + assert remaining_subject.role.id == "GEOGRAPHICAL" + end + + test "applies add change to uncontrolled date_created field", %{plan: plan} do + work = work_fixture() + + {:ok, change} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work.id, + add: %{descriptive_metadata: %{date_created: ["1896-11-10"]}}, + status: :approved + }) + + assert {:ok, executed_change} = Planner.execute_plan_change(change) + assert executed_change.status == :executed + + updated_work = Repo.get!(Meadow.Data.Schemas.Work, work.id) + assert [%{edtf: "1896-11-10", humanized: humanized}] = + updated_work.descriptive_metadata.date_created + + assert humanized != nil + end + + test "marks error when work not found", %{plan: plan} do + fake_work_id = Ecto.UUID.generate() + + {:ok, change} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: fake_work_id, + add: %{}, + status: :approved + }) + + assert {:ok, error_change} = Planner.execute_plan_change(change) + assert error_change.status == :error + assert error_change.error == "\"Work not found\"" + end + end + + describe "full workflow integration" do + test "Spanish translation workflow" do + # 1. Create plan + {:ok, plan} = + Planner.create_plan(%{ + prompt: "Translate titles to Spanish in alternate_title field", + query: "collection.id:abc-123" + }) + + # 2. Create work-specific changes + work_a = work_fixture(%{descriptive_metadata: %{title: "The Cat"}}) + work_b = work_fixture(%{descriptive_metadata: %{title: "The House"}}) + + {:ok, change_a} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work_a.id, + add: %{descriptive_metadata: %{alternate_title: ["El Gato"]}} + }) + + {:ok, change_b} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work_b.id, + add: %{descriptive_metadata: %{alternate_title: ["La Casa"]}} + }) + + # 3. User reviews and approves + {:ok, plan} = Planner.approve_plan(plan, "curator@example.com") + {:ok, _} = Planner.approve_plan_change(change_a, "curator@example.com") + {:ok, _} = Planner.approve_plan_change(change_b, "curator@example.com") + + # 4. Execute plan + assert {:ok, executed_plan} = Planner.execute_plan(plan) + assert executed_plan.status == :executed + + # Verify all changes were applied + assert Planner.list_plan_changes(plan.id, status: :executed) |> length() == 2 + end + + test "Remove extraneous subjects workflow" do + # 1. Create plan to remove extraneous subjects + {:ok, plan} = + Planner.create_plan(%{ + prompt: "Remove extraneous subject headings like 'Photograph' and 'Image'", + query: "collection.id:test-collection" + }) + + # 2. Create works with both good and extraneous subjects + work_a = + work_fixture(%{ + descriptive_metadata: %{ + title: "Photo of Building", + subject: [ + %{term: "mock1:result1", role: %{id: "TOPICAL", scheme: "subject_role"}}, + %{term: "mock1:result2", role: %{id: "TOPICAL", scheme: "subject_role"}}, + %{term: "mock2:result3", role: %{id: "TOPICAL", scheme: "subject_role"}} + ] + } + }) + + work_b = + work_fixture(%{ + descriptive_metadata: %{ + title: "Image of Person", + subject: [ + %{term: "mock1:result1", role: %{id: "TOPICAL", scheme: "subject_role"}}, + %{term: "mock1:result2", role: %{id: "TOPICAL", scheme: "subject_role"}} + ] + } + }) + + # 3. Reload works to get full DB structure + work_a = Repo.get!(Meadow.Data.Schemas.Work, work_a.id) + work_b = Repo.get!(Meadow.Data.Schemas.Work, work_b.id) + + # Find the subjects to delete by ID and convert to maps matching JSONB storage + [_keep, subj_a_2, subj_a_3] = work_a.descriptive_metadata.subject + [_keep_b, subj_b_2] = work_b.descriptive_metadata.subject + + subj_a_2_map = subj_a_2 |> Jason.encode!() |> Jason.decode!(keys: :atoms) + subj_a_3_map = subj_a_3 |> Jason.encode!() |> Jason.decode!(keys: :atoms) + subj_b_2_map = subj_b_2 |> Jason.encode!() |> Jason.decode!(keys: :atoms) + + # 4. Agent creates deletion changes for extraneous subjects + # Work A: Delete "Second Result" (mock1:result2) and "Third Result" (mock2:result3) + {:ok, change_a} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work_a.id, + delete: %{ + descriptive_metadata: %{ + subject: [subj_a_2_map, subj_a_3_map] + } + } + }) + + # Work B: Delete "Second Result" (mock1:result2) + {:ok, change_b} = + Planner.create_plan_change(%{ + plan_id: plan.id, + work_id: work_b.id, + delete: %{ + descriptive_metadata: %{ + subject: [subj_b_2_map] + } + } + }) + + # 5. User reviews and approves + {:ok, plan} = Planner.approve_plan(plan, "curator@example.com") + {:ok, _} = Planner.approve_plan_change(change_a, "curator@example.com") + {:ok, _} = Planner.approve_plan_change(change_b, "curator@example.com") + + # 6. Execute plan + assert {:ok, executed_plan} = Planner.execute_plan(plan) + assert executed_plan.status == :executed + + # 7. Verify changes were applied + updated_work_a = Repo.get!(Meadow.Data.Schemas.Work, work_a.id) + assert length(updated_work_a.descriptive_metadata.subject) == 1 + [remaining_a] = updated_work_a.descriptive_metadata.subject + assert remaining_a.term.id == "mock1:result1" + assert remaining_a.term.label == "First Result" + + updated_work_b = Repo.get!(Meadow.Data.Schemas.Work, work_b.id) + assert length(updated_work_b.descriptive_metadata.subject) == 1 + [remaining_b] = updated_work_b.descriptive_metadata.subject + assert remaining_b.term.id == "mock1:result1" + assert remaining_b.term.label == "First Result" + + # Verify all changes were executed + assert Planner.list_plan_changes(plan.id, status: :executed) |> length() == 2 + end + end +end diff --git a/app/test/meadow/data/schemas/plan_change_test.exs b/app/test/meadow/data/schemas/plan_change_test.exs new file mode 100644 index 000000000..f65667f00 --- /dev/null +++ b/app/test/meadow/data/schemas/plan_change_test.exs @@ -0,0 +1,336 @@ +defmodule Meadow.Data.Schemas.PlanChangeTest do + use Meadow.DataCase + alias Meadow.Data.Schemas.{Plan, PlanChange} + + setup do + {:ok, plan} = + %Plan{} + |> Plan.changeset(%{ + prompt: "Translate titles to Spanish", + query: "collection.id:abc-123" + }) + |> Repo.insert() + + work = work_fixture() + {:ok, plan: plan, work: work} + end + + @valid_attrs %{ + work_id: nil, + add: %{ + descriptive_metadata: %{ + subject: [ + %{ + role: %{id: "TOPICAL", scheme: "subject_role"}, + term: %{id: "http://id.loc.gov/authorities/subjects/sh85141086"} + } + ] + } + } + } + + @invalid_attrs %{work_id: nil, add: nil, delete: nil, replace: nil} + + describe "changeset/2" do + test "with valid attributes", %{plan: plan, work: work} do + attrs = Map.merge(@valid_attrs, %{plan_id: plan.id, work_id: work.id}) + changeset = PlanChange.changeset(%PlanChange{}, attrs) + assert changeset.valid? + end + + test "without required work_id", %{plan: plan} do + attrs = Map.merge(@invalid_attrs, %{plan_id: plan.id}) + changeset = PlanChange.changeset(%PlanChange{}, attrs) + refute changeset.valid? + assert "can't be blank" in errors_on(changeset).work_id + end + + test "without any operation", %{plan: plan, work: work} do + attrs = %{plan_id: plan.id, work_id: work.id} + changeset = PlanChange.changeset(%PlanChange{}, attrs) + refute changeset.valid? + assert "at least one of add, delete, or replace must be specified" in errors_on(changeset).add + end + + test "validates add is a map", %{plan: plan, work: work} do + attrs = %{plan_id: plan.id, work_id: work.id, add: "not a map"} + changeset = PlanChange.changeset(%PlanChange{}, attrs) + refute changeset.valid? + assert "is invalid" in errors_on(changeset).add + end + + test "validates status is in allowed values", %{plan: plan, work: work} do + attrs = Map.merge(@valid_attrs, %{plan_id: plan.id, work_id: work.id, status: :invalid}) + changeset = PlanChange.changeset(%PlanChange{}, attrs) + refute changeset.valid? + end + + test "allows valid status values", %{plan: plan, work: work} do + for status <- [:pending, :approved, :rejected, :executed, :error] do + attrs = Map.merge(@valid_attrs, %{plan_id: plan.id, work_id: work.id, status: status}) + changeset = PlanChange.changeset(%PlanChange{}, attrs) + assert changeset.valid? + end + end + end + + describe "approve/2" do + test "transitions to approved status", %{plan: plan, work: work} do + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work.id, + add: @valid_attrs.add + }) + |> Repo.insert() + + changeset = PlanChange.approve(change, "user@example.com") + + assert changeset.valid? + assert get_change(changeset, :status) == :approved + assert get_change(changeset, :user) == "user@example.com" + end + end + + describe "reject/2" do + test "transitions to rejected status with notes", %{plan: plan, work: work} do + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work.id, + add: @valid_attrs.add + }) + |> Repo.insert() + + changeset = PlanChange.reject(change, "Translation is incorrect") + + assert changeset.valid? + assert get_change(changeset, :status) == :rejected + assert get_change(changeset, :notes) == "Translation is incorrect" + end + end + + describe "mark_executed/1" do + test "transitions to executed status with timestamp", %{plan: plan, work: work} do + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work.id, + add: @valid_attrs.add, + status: :approved + }) + |> Repo.insert() + + changeset = PlanChange.mark_executed(change) + + assert changeset.valid? + assert get_change(changeset, :status) == :executed + assert get_change(changeset, :executed_at) + end + end + + describe "mark_error/2" do + test "transitions to error status with error message", %{plan: plan, work: work} do + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work.id, + add: @valid_attrs.add, + status: :approved + }) + |> Repo.insert() + + changeset = PlanChange.mark_error(change, "Work not found") + + assert changeset.valid? + assert get_change(changeset, :status) == :error + assert get_change(changeset, :error) == "Work not found" + end + end + + describe "integration scenarios" do + test "Spanish translation for work A", %{plan: plan} do + work_a = work_fixture(%{descriptive_metadata: %{title: "The Cat"}}) + + # Agent creates change with Spanish translation + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work_a.id, + add: %{ + descriptive_metadata: %{alternate_title: ["El Gato"]} + } + }) + |> Repo.insert() + + assert change.status == :pending + assert change.add.descriptive_metadata.alternate_title == ["El Gato"] + + # User approves + {:ok, approved} = + change + |> PlanChange.approve("user@example.com") + |> Repo.update() + + assert approved.status == :approved + assert approved.user == "user@example.com" + + # System executes + {:ok, executed} = + approved + |> PlanChange.mark_executed() + |> Repo.update() + + assert executed.status == :executed + assert executed.executed_at + end + + test "LCNAF contributor assignment", %{plan: plan} do + work = work_fixture(%{descriptive_metadata: %{title: "Ansel Adams Photography"}}) + + # Agent creates change with LOC authority + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work.id, + add: %{ + descriptive_metadata: %{ + contributor: [ + %{ + role: %{id: "pht", scheme: "marc_relator"}, + term: %{ + id: "http://id.loc.gov/authorities/names/n79127000", + label: "Adams, Ansel, 1902-1984" + } + } + ] + } + } + }) + |> Repo.insert() + + assert [contrib] = change.add.descriptive_metadata.contributor + assert contrib.role.id == "pht" + assert contrib.term.id == "http://id.loc.gov/authorities/names/n79127000" + assert contrib.term.label == "Adams, Ansel, 1902-1984" + + # User approves and executes + {:ok, approved} = + change + |> PlanChange.approve("curator@example.com") + |> Repo.update() + + {:ok, executed} = + approved + |> PlanChange.mark_executed() + |> Repo.update() + + assert executed.status == :executed + end + + test "rejected change workflow", %{plan: plan, work: work} do + # Agent creates change + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work.id, + add: %{ + descriptive_metadata: %{alternate_title: ["Wrong Translation"]} + } + }) + |> Repo.insert() + + # User rejects with notes + {:ok, rejected} = + change + |> PlanChange.reject("This translation is inaccurate") + |> Repo.update() + + assert rejected.status == :rejected + assert rejected.notes == "This translation is inaccurate" + end + + test "execution error workflow", %{plan: plan} do + # Create change for non-existent work + fake_work_id = Ecto.UUID.generate() + + {:ok, change} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: fake_work_id, + add: @valid_attrs.add, + status: :approved + }) + |> Repo.insert() + + # Mark as error when execution fails + {:ok, error_change} = + change + |> PlanChange.mark_error("Work #{fake_work_id} not found") + |> Repo.update() + + assert error_change.status == :error + assert error_change.error =~ "not found" + end + + test "multiple changes for the same plan", %{plan: plan} do + work_a = work_fixture(%{descriptive_metadata: %{title: "The Cat"}}) + work_b = work_fixture(%{descriptive_metadata: %{title: "The House"}}) + work_c = work_fixture(%{descriptive_metadata: %{title: "The Dogs"}}) + + # Agent creates multiple changes + {:ok, change_a} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work_a.id, + add: %{descriptive_metadata: %{alternate_title: ["El Gato"]}} + }) + |> Repo.insert() + + {:ok, change_b} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work_b.id, + add: %{descriptive_metadata: %{alternate_title: ["La Casa"]}} + }) + |> Repo.insert() + + {:ok, change_c} = + %PlanChange{} + |> PlanChange.changeset(%{ + plan_id: plan.id, + work_id: work_c.id, + add: %{descriptive_metadata: %{alternate_title: ["Los Perros"]}} + }) + |> Repo.insert() + + # Verify all changes belong to the same plan + changes = Repo.all(from c in PlanChange, where: c.plan_id == ^plan.id) + assert length(changes) == 3 + assert Enum.all?(changes, &(&1.plan_id == plan.id)) + + # User can approve/reject individually + {:ok, _} = PlanChange.approve(change_a, "user@example.com") |> Repo.update() + {:ok, _} = PlanChange.approve(change_b, "user@example.com") |> Repo.update() + {:ok, _} = PlanChange.reject(change_c, "Needs review") |> Repo.update() + + approved_count = + Repo.aggregate( + from(c in PlanChange, where: c.plan_id == ^plan.id and c.status == :approved), + :count + ) + + assert approved_count == 2 + end + end +end diff --git a/app/test/meadow/data/schemas/plan_test.exs b/app/test/meadow/data/schemas/plan_test.exs new file mode 100644 index 000000000..fa3216d32 --- /dev/null +++ b/app/test/meadow/data/schemas/plan_test.exs @@ -0,0 +1,181 @@ +defmodule Meadow.Data.Schemas.PlanTest do + use Meadow.DataCase + alias Meadow.Data.Schemas.Plan + + @valid_attrs %{ + prompt: "Translate titles to Spanish in alternate_title field", + query: "collection.id:abc-123" + } + + @invalid_attrs %{prompt: nil} + + describe "changeset/2" do + test "with valid attributes" do + changeset = Plan.changeset(%Plan{}, @valid_attrs) + assert changeset.valid? + end + + test "without required prompt" do + changeset = Plan.changeset(%Plan{}, @invalid_attrs) + refute changeset.valid? + assert "can't be blank" in errors_on(changeset).prompt + end + + test "validates status is in allowed values" do + changeset = Plan.changeset(%Plan{}, Map.put(@valid_attrs, :status, :invalid_status)) + refute changeset.valid? + end + + test "allows valid status values" do + for status <- [:pending, :approved, :rejected, :executed, :error] do + changeset = Plan.changeset(%Plan{}, Map.put(@valid_attrs, :status, status)) + assert changeset.valid? + end + end + + test "query is optional" do + changeset = Plan.changeset(%Plan{}, Map.delete(@valid_attrs, :query)) + assert changeset.valid? + end + end + + describe "approve/2" do + test "transitions to approved status" do + plan = %Plan{status: :pending} + changeset = Plan.approve(plan, "user@example.com") + + assert changeset.valid? + assert get_change(changeset, :status) == :approved + assert get_change(changeset, :user) == "user@example.com" + end + + test "works without user" do + plan = %Plan{status: :pending} + changeset = Plan.approve(plan) + + assert changeset.valid? + assert get_change(changeset, :status) == :approved + end + end + + describe "reject/2" do + test "transitions to rejected status with notes" do + plan = %Plan{status: :pending} + changeset = Plan.reject(plan, "Changes not needed") + + assert changeset.valid? + assert get_change(changeset, :status) == :rejected + assert get_change(changeset, :notes) == "Changes not needed" + end + + test "works without notes" do + plan = %Plan{status: :pending} + changeset = Plan.reject(plan) + + assert changeset.valid? + assert get_change(changeset, :status) == :rejected + end + end + + describe "mark_executed/1" do + test "transitions to executed status with timestamp" do + plan = %Plan{status: :approved} + changeset = Plan.mark_executed(plan) + + assert changeset.valid? + assert get_change(changeset, :status) == :executed + assert get_change(changeset, :executed_at) + end + end + + describe "mark_error/2" do + test "transitions to error status with error message" do + plan = %Plan{status: :approved} + changeset = Plan.mark_error(plan, "Database connection failed") + + assert changeset.valid? + assert get_change(changeset, :status) == :error + assert get_change(changeset, :error) == "Database connection failed" + end + end + + describe "integration scenarios" do + test "Spanish translation workflow" do + # Agent creates plan with prompt and query + {:ok, plan} = + %Plan{} + |> Plan.changeset(%{ + prompt: "Translate titles to Spanish in alternate_title field", + query: "collection.id:abc-123" + }) + |> Repo.insert() + + assert plan.status == :pending + assert plan.prompt == "Translate titles to Spanish in alternate_title field" + + # User approves the plan + {:ok, approved_plan} = + plan + |> Plan.approve("user@example.com") + |> Repo.update() + + assert approved_plan.status == :approved + assert approved_plan.user == "user@example.com" + + # System marks as executed + {:ok, executed_plan} = + approved_plan + |> Plan.mark_executed() + |> Repo.update() + + assert executed_plan.status == :executed + assert executed_plan.executed_at + end + + test "LCNAF lookup workflow with error" do + # Agent creates plan + {:ok, plan} = + %Plan{} + |> Plan.changeset(%{ + prompt: "Look up LCNAF names from description and assign as contributors", + query: "id:(work-1 OR work-2)" + }) + |> Repo.insert() + + # User approves + {:ok, approved_plan} = + plan + |> Plan.approve("admin@example.com") + |> Repo.update() + + # Execution encounters error + {:ok, error_plan} = + approved_plan + |> Plan.mark_error("LOC API unavailable") + |> Repo.update() + + assert error_plan.status == :error + assert error_plan.error == "LOC API unavailable" + end + + test "rejection workflow" do + # Agent creates plan + {:ok, plan} = + %Plan{} + |> Plan.changeset(%{ + prompt: "Delete all works in collection", + query: "collection.id:xyz" + }) + |> Repo.insert() + + # User rejects with notes + {:ok, rejected_plan} = + plan + |> Plan.reject("This would delete important works") + |> Repo.update() + + assert rejected_plan.status == :rejected + assert rejected_plan.notes == "This would delete important works" + end + end +end