|
| 1 | +# SPDX-License-Identifier: Apache-2.0 |
| 2 | + |
| 3 | +defmodule Carbonite.Migrations.V12 do |
| 4 | + @moduledoc false |
| 5 | + |
| 6 | + use Ecto.Migration |
| 7 | + use Carbonite.Migrations.Version |
| 8 | + alias Carbonite.Migrations.V11 |
| 9 | + |
| 10 | + @type prefix :: binary() |
| 11 | + |
| 12 | + defp create_capture_changes_procedure(prefix) do |
| 13 | + """ |
| 14 | + CREATE OR REPLACE FUNCTION #{prefix}.capture_changes() RETURNS TRIGGER AS |
| 15 | + $body$ |
| 16 | + DECLARE |
| 17 | + trigger_row RECORD; |
| 18 | + change_row #{prefix}.changes; |
| 19 | +
|
| 20 | + pk_source RECORD; |
| 21 | + pk_col VARCHAR; |
| 22 | + pk_col_val VARCHAR; |
| 23 | + BEGIN |
| 24 | + /* load trigger config */ |
| 25 | + WITH settings AS (SELECT NULLIF(current_setting('#{prefix}.override_mode', TRUE), '')::TEXT AS override_mode) |
| 26 | + SELECT |
| 27 | + primary_key_columns, |
| 28 | + excluded_columns, |
| 29 | + filtered_columns, |
| 30 | + CASE |
| 31 | + WHEN settings.override_mode = 'override' AND mode = 'ignore' THEN 'capture' |
| 32 | + WHEN settings.override_mode = 'override' AND mode = 'capture' THEN 'ignore' |
| 33 | + ELSE COALESCE(settings.override_mode, mode::text) |
| 34 | + END AS mode, |
| 35 | + store_changed_from |
| 36 | + INTO trigger_row |
| 37 | + FROM #{prefix}.triggers |
| 38 | + JOIN settings ON TRUE |
| 39 | + WHERE table_prefix = TG_TABLE_SCHEMA AND table_name = TG_TABLE_NAME; |
| 40 | +
|
| 41 | + IF (trigger_row IS NULL) THEN |
| 42 | + RAISE '(carbonite) % on table %.% but no trigger record in #{prefix}.triggers', |
| 43 | + TG_OP, TG_TABLE_SCHEMA, TG_TABLE_NAME USING ERRCODE = 'no_data_found'; |
| 44 | + END IF; |
| 45 | +
|
| 46 | + /* skip if ignored */ |
| 47 | + IF (trigger_row.mode = 'ignore') THEN |
| 48 | + RETURN NULL; |
| 49 | + END IF; |
| 50 | +
|
| 51 | + /* instantiate change row */ |
| 52 | + change_row := ROW( |
| 53 | + NEXTVAL('#{prefix}.changes_id_seq'), |
| 54 | + pg_current_xact_id(), |
| 55 | + LOWER(TG_OP::TEXT), |
| 56 | + TG_TABLE_SCHEMA::TEXT, |
| 57 | + TG_TABLE_NAME::TEXT, |
| 58 | + NULL, |
| 59 | + NULL, |
| 60 | + '{}', |
| 61 | + NULL, |
| 62 | + NULL |
| 63 | + ); |
| 64 | +
|
| 65 | + /* collect table pk */ |
| 66 | + IF trigger_row.primary_key_columns != '{}' THEN |
| 67 | + IF (TG_OP IN ('INSERT', 'UPDATE')) THEN |
| 68 | + pk_source := NEW; |
| 69 | + ELSIF (TG_OP = 'DELETE') THEN |
| 70 | + pk_source := OLD; |
| 71 | + END IF; |
| 72 | +
|
| 73 | + change_row.table_pk = '{}'; |
| 74 | + FOREACH pk_col IN ARRAY trigger_row.primary_key_columns LOOP |
| 75 | + EXECUTE 'SELECT $1.' || quote_ident(pk_col) || '::TEXT' USING pk_source INTO pk_col_val; |
| 76 | + change_row.table_pk := change_row.table_pk || pk_col_val; |
| 77 | + END LOOP; |
| 78 | + END IF; |
| 79 | +
|
| 80 | + /* collect version data */ |
| 81 | + IF (TG_OP IN ('INSERT', 'UPDATE')) THEN |
| 82 | + SELECT to_jsonb(NEW.*) - trigger_row.excluded_columns |
| 83 | + INTO change_row.data; |
| 84 | + ELSIF (TG_OP = 'DELETE') THEN |
| 85 | + SELECT to_jsonb(OLD.*) - trigger_row.excluded_columns |
| 86 | + INTO change_row.data; |
| 87 | + END IF; |
| 88 | +
|
| 89 | + /* change tracking for UPDATEs */ |
| 90 | + IF (TG_OP = 'UPDATE') THEN |
| 91 | + change_row.changed_from = '{}'::JSONB; |
| 92 | +
|
| 93 | + SELECT jsonb_object_agg(before.key, before.value) |
| 94 | + FROM jsonb_each(to_jsonb(OLD.*) - trigger_row.excluded_columns) AS before |
| 95 | + WHERE (change_row.data->before.key)::JSONB != before.value |
| 96 | + INTO change_row.changed_from; |
| 97 | +
|
| 98 | + SELECT ARRAY(SELECT jsonb_object_keys(change_row.changed_from)) |
| 99 | + INTO change_row.changed; |
| 100 | +
|
| 101 | + /* skip persisting this update if nothing has changed */ |
| 102 | + IF change_row.changed = '{}' THEN |
| 103 | + RETURN NULL; |
| 104 | + END IF; |
| 105 | +
|
| 106 | + /* persisting the old data is opt-in, discard if not configured. */ |
| 107 | + IF trigger_row.store_changed_from IS FALSE THEN |
| 108 | + change_row.changed_from := NULL; |
| 109 | + END IF; |
| 110 | + END IF; |
| 111 | +
|
| 112 | + /* filtered columns */ |
| 113 | + SELECT #{prefix}.jsonb_redact_keys(change_row.data, trigger_row.filtered_columns) |
| 114 | + INTO change_row.data; |
| 115 | +
|
| 116 | + IF change_row.changed_from IS NOT NULL THEN |
| 117 | + SELECT #{prefix}.jsonb_redact_keys(change_row.changed_from, trigger_row.filtered_columns) |
| 118 | + INTO change_row.changed_from; |
| 119 | + END IF; |
| 120 | +
|
| 121 | + /* insert, fail gracefully unless transaction record present or NEXTVAL has never been called */ |
| 122 | + BEGIN |
| 123 | + change_row.transaction_id = CURRVAL('#{prefix}.transactions_id_seq'); |
| 124 | +
|
| 125 | + /* verify that xact_id matches */ |
| 126 | + IF NOT |
| 127 | + EXISTS( |
| 128 | + SELECT 1 FROM #{prefix}.transactions |
| 129 | + WHERE id = change_row.transaction_id AND xact_id = change_row.transaction_xact_id |
| 130 | + ) |
| 131 | + THEN |
| 132 | + RAISE USING ERRCODE = 'foreign_key_violation'; |
| 133 | + END IF; |
| 134 | +
|
| 135 | + INSERT INTO #{prefix}.changes VALUES (change_row.*); |
| 136 | + EXCEPTION WHEN foreign_key_violation OR object_not_in_prerequisite_state THEN |
| 137 | + RAISE '(carbonite) % on table %.% without prior INSERT into #{prefix}.transactions', |
| 138 | + TG_OP, TG_TABLE_SCHEMA, TG_TABLE_NAME USING ERRCODE = 'foreign_key_violation'; |
| 139 | + END; |
| 140 | +
|
| 141 | + RETURN NULL; |
| 142 | + END; |
| 143 | + $body$ |
| 144 | + LANGUAGE plpgsql; |
| 145 | + """ |
| 146 | + |> squish_and_execute() |
| 147 | + |
| 148 | + :ok |
| 149 | + end |
| 150 | + |
| 151 | + @type up_option :: {:carbonite_prefix, prefix()} |
| 152 | + |
| 153 | + @impl true |
| 154 | + @spec up([up_option()]) :: :ok |
| 155 | + def up(opts) do |
| 156 | + prefix = Keyword.get(opts, :carbonite_prefix, default_prefix()) |
| 157 | + |
| 158 | + create_capture_changes_procedure(prefix) |
| 159 | + |
| 160 | + :ok |
| 161 | + end |
| 162 | + |
| 163 | + @type down_option :: {:carbonite_prefix, prefix()} |
| 164 | + |
| 165 | + @impl true |
| 166 | + @spec down([down_option()]) :: :ok |
| 167 | + def down(opts) do |
| 168 | + prefix = Keyword.get(opts, :carbonite_prefix, default_prefix()) |
| 169 | + |
| 170 | + V11.create_capture_changes_procedure(prefix) |
| 171 | + |
| 172 | + :ok |
| 173 | + end |
| 174 | +end |
0 commit comments