From 4723ee16ab6c889a41f9766325f2933458ee353b Mon Sep 17 00:00:00 2001 From: Joe Averbukh Date: Tue, 11 Feb 2025 15:14:05 -0800 Subject: [PATCH] Use location-id for Storage (#817) --- client/sandbox/admin-sdk-express/src/index.ts | 30 ++- server/flags-config/instant.schema.ts | 1 + .../52_add_app_files_to_sweep.down.sql | 7 + .../52_add_app_files_to_sweep.up.sql | 57 +++++ server/src/instant/admin/routes.clj | 18 +- server/src/instant/db/instaql.clj | 27 ++- server/src/instant/db/transaction.clj | 15 +- server/src/instant/model/app_file.clj | 196 ++++++++++++------ server/src/instant/storage/coordinator.clj | 24 ++- server/src/instant/storage/s3.clj | 111 ++++++---- server/src/instant/system_catalog.clj | 7 +- server/src/instant/system_catalog_ops.clj | 42 +++- server/src/instant/util/s3.clj | 70 ++++++- server/src/instant/util/uuid.clj | 8 + 14 files changed, 458 insertions(+), 155 deletions(-) create mode 100644 server/resources/migrations/52_add_app_files_to_sweep.down.sql create mode 100644 server/resources/migrations/52_add_app_files_to_sweep.up.sql diff --git a/client/sandbox/admin-sdk-express/src/index.ts b/client/sandbox/admin-sdk-express/src/index.ts index e3a0649d7..117f0cfd3 100644 --- a/client/sandbox/admin-sdk-express/src/index.ts +++ b/client/sandbox/admin-sdk-express/src/index.ts @@ -218,14 +218,40 @@ async function testDeleteFileTransactFails() { } } -// testUploadFile("circle_blue.jpg", "circle_blue.jpg", "image/jpeg"); +async function testDeleteAllowedInTx( + src: string, + dest: string, + contentType?: string, +) { + const buffer = fs.readFileSync(path.join(__dirname, src)); + const { data } = await db.storage.uploadFile(dest, buffer, { + contentType: contentType, + }); + const fileId = data.id; + const q = { + $files: { + $: { + where: { id: fileId }, + }, + }, + }; + const before = await query(q); + console.log('Before', JSON.stringify(before, null, 2)); + + await transact(tx['$files'][fileId].delete()); + + const after = await query(q); + console.log('After', JSON.stringify(after, null, 2)); +} + +// testUploadFile('circle_blue.jpg', 'circle_blue.jpg', 'image/jpeg'); // testUploadFile("circle_blue.jpg", "circle_blue2.jpg", "image/jpeg"); // testQueryFiles() // testDeleteSingleFile("circle_blue.jpg"); // testDeleteBulkFile(["circle_blue.jpg", "circle_blue2.jpg"]); // testUpdateFileFails() // testMergeFileFails() -// testDeleteFileTransactFails() +// testDeleteAllowedInTx('circle_blue.jpg', 'circle_blue.jpg', 'image/jpeg'); /** * Legacy Storage API tests (deprecated Jan 2025) diff --git a/server/flags-config/instant.schema.ts b/server/flags-config/instant.schema.ts index da84396a4..14b9f52cf 100644 --- a/server/flags-config/instant.schema.ts +++ b/server/flags-config/instant.schema.ts @@ -54,6 +54,7 @@ const graph = i.graph( }), "storage-migration": i.entity({ "disableLegacy?": i.boolean(), + "useLocationId?": i.boolean(), }), }, // You can define links here. diff --git a/server/resources/migrations/52_add_app_files_to_sweep.down.sql b/server/resources/migrations/52_add_app_files_to_sweep.down.sql new file mode 100644 index 000000000..ca01461cf --- /dev/null +++ b/server/resources/migrations/52_add_app_files_to_sweep.down.sql @@ -0,0 +1,7 @@ +DROP TRIGGER insert_files_to_sweep_trigger ON triples; +DROP TRIGGER update_files_to_sweep_trigger ON triples; + +DROP FUNCTION create_file_to_sweep(); +DROP FUNCTION create_file_to_sweep_on_update(); + +DROP TABLE app_files_to_sweep; diff --git a/server/resources/migrations/52_add_app_files_to_sweep.up.sql b/server/resources/migrations/52_add_app_files_to_sweep.up.sql new file mode 100644 index 000000000..fca0fbd75 --- /dev/null +++ b/server/resources/migrations/52_add_app_files_to_sweep.up.sql @@ -0,0 +1,57 @@ +CREATE TABLE app_files_to_sweep ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + app_id uuid NOT NULL, + location_id text NOT NULL, + created_at timestamp NOT NULL DEFAULT NOW(), + UNIQUE (app_id, location_id) +); + +-- Whenever we delete file triples we want to ensure they are scheduled for +-- deletion in S3. +CREATE FUNCTION create_file_to_sweep() +RETURNS trigger AS $$ +BEGIN + -- This should match the attr_id for $files.location-id + IF OLD.attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' THEN + INSERT INTO app_files_to_sweep (app_id, location_id) + VALUES ( + OLD.app_id, + OLD.value #>> '{}' -- Extract from JSON + ) + ON CONFLICT DO NOTHING; + END IF; + RETURN OLD; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER insert_files_to_sweep_trigger + AFTER DELETE ON triples + FOR EACH ROW + EXECUTE FUNCTION create_file_to_sweep(); + +-- Whenever we upload a file with the same path we create a new location-id for +-- it. We add the old location-id to the sweep table to ensure that the old file +-- is cleaned up +CREATE FUNCTION create_file_to_sweep_on_update() +RETURNS trigger AS $$ +BEGIN + -- Check if we're updating from the file location attribute + IF OLD.attr_id = '96653230-13ff-ffff-2a34-b40fffffffff' THEN + -- Only schedule for deletion if the value actually changed + IF OLD.value != NEW.value THEN + INSERT INTO app_files_to_sweep (app_id, location_id) + VALUES ( + OLD.app_id, + OLD.value #>> '{}' -- Extract from JSON + ) + ON CONFLICT DO NOTHING; + END IF; + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER update_files_to_sweep_trigger + AFTER UPDATE ON triples + FOR EACH ROW + EXECUTE FUNCTION create_file_to_sweep_on_update(); diff --git a/server/src/instant/admin/routes.clj b/server/src/instant/admin/routes.clj index 4eb942af9..2a43e1c21 100644 --- a/server/src/instant/admin/routes.clj +++ b/server/src/instant/admin/routes.clj @@ -24,6 +24,7 @@ [instant.model.schema :as schema-model] [clojure.string :as string] [instant.storage.coordinator :as storage-coordinator] + [instant.storage.s3 :as instant-s3] [clojure.walk :as w]) (:import (java.util UUID))) @@ -453,19 +454,22 @@ ;; Legacy StorageFile format that was only used by the list() endpoint (defn legacy-storage-file-format - [app-id object-metadata] - {:key (str app-id "/" (:path object-metadata)) - :name (:path object-metadata) - :size (:content-length object-metadata) - :etag (:etag object-metadata) - :last_modified (:last-modified object-metadata)}) + [app-id file] + (let [object-key (if (instant-s3/migrating?) + (instant-s3/->path-object-key app-id (:path file)) + (instant-s3/->object-key app-id (:location-id file)))] + {:key object-key + :name (:path file) + :size (:size file) + :etag nil + :last_modified nil})) (defn files-get [req] (let [{app-id :app_id} (req->admin-token! req) res (query-post (assoc-in req [:body :query] {:$files {}})) files (get-in res [:body "$files"]) data (map (fn [item] - (->> (get item "metadata") + (->> item w/keywordize-keys (legacy-storage-file-format app-id))) files)] diff --git a/server/src/instant/db/instaql.clj b/server/src/instant/db/instaql.clj index 8c162e91e..2600d6cb9 100644 --- a/server/src/instant/db/instaql.clj +++ b/server/src/instant/db/instaql.clj @@ -5,7 +5,6 @@ [clojure.string :as string] [clojure.walk :as walk] [honey.sql :as hsql] - [instant.flags :as flags] [instant.data.constants :refer [zeneca-app-id]] [instant.data.resolvers :as resolvers] [instant.db.cel :as cel] @@ -881,15 +880,23 @@ (defn compute-$files-triples [{:keys [app-id]} join-rows] (when-let [[eid _ _ t] (ffirst join-rows)] - (when-let [path-value (some-> (find-row-by-ident-name join-rows $system-attrs "$files" "path") - first - (nth 2))] - (let [url-aid (attr-model/resolve-attr-id $system-attrs "$files" "url") - {:keys [disableLegacy?]} (flags/storage-migration) - url (if disableLegacy? - (instant-s3/create-signed-download-url! app-id path-value) - (instant-s3/create-legacy-signed-download-url! app-id path-value))] - [[[eid url-aid url t]]])))) + (let [path (some-> (find-row-by-ident-name + join-rows + $system-attrs + "$files" + "path") + first + (nth 2)) + location-id (some-> (find-row-by-ident-name + join-rows + $system-attrs + "$files" + "location-id") + first + (nth 2)) + url-aid (attr-model/resolve-attr-id $system-attrs "$files" "url") + url (instant-s3/create-signed-download-url! app-id path location-id)] + [[[eid url-aid url t]]]))) (def compute-triples-handler {"$files" compute-$files-triples}) diff --git a/server/src/instant/db/transaction.clj b/server/src/instant/db/transaction.clj index cfa0e603b..77508f619 100644 --- a/server/src/instant/db/transaction.clj +++ b/server/src/instant/db/transaction.clj @@ -136,18 +136,8 @@ [op t] [{:message (format "update or merge is not allowed on $files in transact.")}]))) -(defn prevent-$files-deletes! [op tx-steps] - (doseq [t tx-steps - :let [[_op _eid etype] t] - :when (= etype "$files")] - (ex/throw-validation-err! - :tx-step - [op t] - [{:message (format "delete is not allowed on $files in transact.")}]))) - (defn prevent-$files-updates - "With the exception of linking/unlinking, we prevent most updates unless it's - gone through an explicitly allowed code path" + "Files support delete, link/unlink, but not update or merge" [attrs grouped-tx-steps opts] (when (not (:allow-$files-update? opts)) (doseq [batch grouped-tx-steps @@ -156,9 +146,6 @@ (:add-triple :deep-merge-triple :retract-triple) (prevent-$files-add-retract! op attrs tx-steps) - :delete-entity - (prevent-$files-deletes! op tx-steps) - nil)))) (defn resolve-lookups diff --git a/server/src/instant/model/app_file.clj b/server/src/instant/model/app_file.clj index 13dfa1eb7..e6f201e73 100644 --- a/server/src/instant/model/app_file.clj +++ b/server/src/instant/model/app_file.clj @@ -12,24 +12,77 @@ (defn create! ([params] (create! (aurora/conn-pool :write) params)) - ([conn {:keys [app-id path metadata]}] + ([conn {:keys [app-id path location-id metadata]}] (update-op conn {:app-id app-id :etype etype} - (fn [{:keys [transact! get-entity-where resolve-id]}] - (let [{id :id} (or (get-entity-where {:path path}) - {:id (random-uuid)}) + (fn [{:keys [transact! resolve-id]}] + (let [lookup [(resolve-id :path) path] {:keys [size content-type content-disposition]} metadata] (transact! - [[:add-triple id (resolve-id :id) id] - [:add-triple id (resolve-id :path) path] - [:add-triple id (resolve-id :size) size] - [:add-triple id (resolve-id :content-type) content-type] - [:add-triple id (resolve-id :content-disposition) content-disposition] - [:add-triple id (resolve-id :key-version) 1]] + [[:add-triple lookup (resolve-id :id) lookup] + [:add-triple lookup (resolve-id :size) size] + [:add-triple lookup (resolve-id :content-type) content-type] + [:add-triple lookup (resolve-id :content-disposition) content-disposition] + [:add-triple lookup (resolve-id :location-id) location-id] + [:add-triple lookup (resolve-id :key-version) 1]] {:allow-$files-update? true}) - {:id id}))))) + {:id lookup}))))) + +(comment + (create! {:app-id #uuid "2d960014-0690-4dc5-b13f-a3c202663241" + :path "circle_red.jpg" + :location-id "circle_red.jpg" + :metadata {:size 123 + :content-type "image/jpeg" + :content-disposition "inline"}}) + (create! {:app-id #uuid "2d960014-0690-4dc5-b13f-a3c202663241" + :path "circle_blue.jpg" + :location-id "circle_blue.jpg" + :metadata {:size 123 + :content-type "image/jpeg" + :content-disposition "inline"}})) + +(defn bulk-add-locations! + ([params] (bulk-add-locations! (aurora/conn-pool :write) params)) + ([conn {:keys [app-id locations-map]}] + (update-op + conn + {:app-id app-id + :etype etype} + (fn [{:keys [transact! resolve-id]}] + (let [location-attr-id (resolve-id :location-id)] + (transact! + (map (fn [{:keys [id location-id]}] + [:add-triple id location-attr-id location-id]) + locations-map) + {:allow-$files-update? true})))))) + +(comment + (bulk-add-locations! + {:app-id #uuid "831355ee-6a59-4990-8ef3-9c9fe7c26031" + :locations-map [{:id #uuid "0036438b-e510-47bf-b62f-835a1cefb392" + :location-id "circle_red.jpg"} + {:id #uuid "007b2d37-3687-4641-a89f-ffd03876b34f" + :location-id "circle_blue.jpg"}]})) + +(defn add-location! + ([params] (add-location! (aurora/conn-pool :write) params)) + ([conn {:keys [app-id id location-id]}] + (update-op + conn + {:app-id app-id + :etype etype} + (fn [{:keys [transact! resolve-id]}] + (transact! + [[:add-triple id (resolve-id :location-id) location-id]] + {:allow-$files-update? true}))))) + +(comment + (add-location! {:app-id #uuid "831355ee-6a59-4990-8ef3-9c9fe7c26031" + :id #uuid "0036438b-e510-47bf-b62f-835a1cefb392" + :location-id "circle_red.jpg"})) (defn bulk-create! ([params] (bulk-create! (aurora/conn-pool :write) params)) @@ -39,31 +92,44 @@ {:app-id app-id :etype etype} (fn [{:keys [transact! resolve-id]}] - ;; Insert in chunks to avoid exceeding max prepared statement size - (doseq [chunk (partition-all 1000 data)] - (let [triples - (mapcat (fn [{:keys [file-id path metadata]}] - (let [{:keys [size content-type content-disposition]} - metadata] - [[:add-triple file-id (resolve-id :id) file-id] - [:add-triple file-id (resolve-id :path) path] - [:add-triple file-id (resolve-id :size) size] - [:add-triple file-id (resolve-id :content-type) content-type] - [:add-triple file-id (resolve-id :content-disposition) content-disposition] - [:add-triple file-id (resolve-id :key-version) 1]])) - chunk)] - (transact! triples {:allow-$files-update? true}))) - {:ids (map :file-id data)})))) - -(defn get-all-ids - ([params] (get-all-ids (aurora/conn-pool :read) params)) - ([conn {:keys [app-id]}] + (let [triples + (mapcat (fn [{:keys [file-id path location-id metadata]}] + (let [{:keys [size content-type content-disposition]} + metadata] + [[:add-triple file-id (resolve-id :id) file-id] + [:add-triple file-id (resolve-id :path) path] + [:add-triple file-id (resolve-id :size) size] + [:add-triple file-id (resolve-id :content-type) content-type] + [:add-triple file-id (resolve-id :content-disposition) content-disposition] + [:add-triple file-id (resolve-id :location-id) location-id] + [:add-triple file-id (resolve-id :key-version) 1]])) + data) + res (transact! triples {:allow-$files-update? true})] + (->> (get-in res [:results :add-triple]) + (map :entity_id) + set)))))) + +(defn get-where + ([params] (get-where (aurora/conn-pool :read) params)) + ([conn {:keys [app-id where]}] (query-op conn {:app-id app-id :etype etype} (fn [{:keys [get-entities-where]}] - (let [ents (get-entities-where {})] - {:ids (mapv :id ents)}))))) + (get-entities-where (or where {})))))) + +(comment + (get-where {:app-id #uuid "831355ee-6a59-4990-8ef3-9c9fe7c26031" + :where {:location-id {:$isNull true}}})) + +(defn get-by-id + ([params] (get-by-id (aurora/conn-pool :read) params)) + ([conn {:keys [app-id id]}] + (query-op conn + {:app-id app-id + :etype etype} + (fn [{:keys [get-entity]}] + (get-entity id))))) (defn get-by-path ([params] (get-by-path (aurora/conn-pool :read) params)) @@ -71,8 +137,12 @@ (query-op conn {:app-id app-id :etype etype} - (fn [{:keys [get-entity-where]}] - (get-entity-where {:path path}))))) + (fn [{:keys [get-entity resolve-id]}] + (get-entity [(resolve-id :path) path]))))) + +(comment + (get-by-path {:app-id #uuid "2d960014-0690-4dc5-b13f-a3c202663241" + :path "circle_blue.jpg"})) (defn get-by-paths ([params] (get-by-paths (aurora/conn-pool :read) params)) @@ -81,16 +151,11 @@ {:app-id app-id :etype etype} (fn [{:keys [get-entities-where]}] - (->> (partition-all 1000 paths) - (mapcat #(get-entities-where {:path {:$in (vec %)}}))))))) + (get-entities-where {:path {:$in (vec paths)}}))))) -(defn delete-by-ids!* [transact! etype ids] - (let [res (transact! (mapv (fn [id] - [:delete-entity id etype]) - ids) - {:allow-$files-update? true})] - (->> (get-in res [:results :delete-entity]) - (map :triples/entity_id)))) +(comment + (get-by-paths {:app-id #uuid "2d960014-0690-4dc5-b13f-a3c202663241" + :paths ["circle_blue.jpg" "circle_red.jpg"]})) (defn delete-by-ids! ([params] (delete-by-ids! (aurora/conn-pool :write) params)) @@ -99,16 +164,8 @@ conn {:app-id app-id :etype etype} - (fn [{:keys [transact!]}] - (let [deleted-ids (mapcat #(delete-by-ids!* transact! etype %) - (partition-all 1000 ids))] - {:ids deleted-ids}))))) - -(defn delete-by-paths! - ([params] (delete-by-paths! (aurora/conn-pool :write) params)) - ([conn {:keys [app-id paths]}] - (let [ents (get-by-paths conn {:app-id app-id :paths paths})] - (delete-by-ids! conn {:app-id app-id :ids (map :id ents)})))) + (fn [{:keys [delete-entities!]}] + (delete-entities! ids {:allow-$files-update? true}))))) (defn delete-by-path! ([params] (delete-by-path! (aurora/conn-pool :write) params)) @@ -117,14 +174,31 @@ conn {:app-id app-id :etype etype} - (fn [{:keys [transact! get-entity-where]}] - (let [ent (get-entity-where {:path path})] - (when (seq ent) - (-> (transact! [[:delete-entity (:id ent) etype]] - {:allow-$files-update? true}) - (get-in [:results :delete-entity]) - first - :triples/entity_id))))))) + (fn [{:keys [resolve-id delete-entity!]}] + (delete-entity! [(resolve-id :path) path] + {:allow-$files-update? true}))))) + +(comment + (delete-by-path! + {:app-id #uuid "2d960014-0690-4dc5-b13f-a3c202663241" + :path "circle_red.jpg"})) + +(defn delete-by-paths! + ([params] (delete-by-paths! (aurora/conn-pool :write) params)) + ([conn {:keys [app-id paths]}] + (update-op + conn + {:app-id app-id + :etype etype} + (fn [{:keys [delete-entities! resolve-id]}] + (let [path-attr-id (resolve-id :path) + lookups (map #(vector path-attr-id %) paths)] + (delete-entities! lookups {:allow-$files-update? true})))))) + +(comment + (delete-by-paths! + {:app-id #uuid "2d960014-0690-4dc5-b13f-a3c202663241" + :paths ["circle_blue.jpg" "circle_red.jpg"]})) (defn get-all-apps-usage ([] (get-all-apps-usage (aurora/conn-pool :read))) diff --git a/server/src/instant/storage/coordinator.clj b/server/src/instant/storage/coordinator.clj index 6d7c68bd6..1241d82b5 100644 --- a/server/src/instant/storage/coordinator.clj +++ b/server/src/instant/storage/coordinator.clj @@ -42,16 +42,22 @@ (assert-storage-permission! "create" {:app-id app-id :path path :current-user current-user})) - (instant-s3/upload-file-to-s3 ctx file) - (let [metadata (instant-s3/get-object-metadata app-id path)] - (app-file-model/create! {:app-id app-id :path path :metadata metadata}))) + (let [location-id (str (random-uuid))] + (instant-s3/upload-file-to-s3 (assoc ctx :location-id location-id) file) + (app-file-model/create! + {:app-id app-id + :path path + :location-id location-id + :metadata (instant-s3/get-object-metadata app-id location-id)}))) (defn delete-files! "Deletes multiple files from both Instant and S3." [{:keys [app-id paths]}] (storage-beta/assert-storage-enabled! app-id) - (let [ids (app-file-model/delete-by-paths! {:app-id app-id :paths paths}) - _ (instant-s3/bulk-delete-files! app-id paths)] + (let [deleted (app-file-model/delete-by-paths! {:app-id app-id :paths paths}) + locations (mapv :location-id deleted) + ids (mapv :id deleted) + _ (instant-s3/bulk-delete-files! app-id paths locations)] {:ids ids})) (defn delete-file! @@ -61,8 +67,8 @@ (assert-storage-permission! "delete" {:app-id app-id :path path :current-user current-user})) - (let [id (app-file-model/delete-by-path! {:app-id app-id :path path}) - _ (instant-s3/delete-file! app-id path)] + (let [{:keys [id location-id]} (app-file-model/delete-by-path! {:app-id app-id :path path}) + _ (instant-s3/delete-file! app-id path location-id)] {:id id})) ;; Logic for legacy S3 upload/download URLs @@ -84,7 +90,6 @@ [{:keys [upload-id content-type]} file] (let [{app-id :app_id path :path expired-at :expired_at} (app-upload-url-model/consume! {:upload-id upload-id})] - (when (or (not expired-at) (.isBefore (.toInstant expired-at) (java.time.Instant/now))) (throw (ex/throw-validation-err! @@ -105,4 +110,5 @@ (assert-storage-permission! "view" {:app-id app-id :path path :current-user current-user})) - (instant-s3/create-signed-download-url! app-id path)) + (let [{:keys [location-id]} (app-file-model/get-by-path {:app-id app-id :path path})] + (instant-s3/create-signed-download-url! app-id path location-id))) diff --git a/server/src/instant/storage/s3.clj b/server/src/instant/storage/s3.clj index 6e679831b..41fa1dab2 100644 --- a/server/src/instant/storage/s3.clj +++ b/server/src/instant/storage/s3.clj @@ -3,10 +3,14 @@ [clojure.java.io :as io] [instant.util.s3 :as s3-util] [instant.flags :as flags]) - (:import [java.time Duration])) + (:import + [java.time Duration])) ;; Legacy S3 migration helpers ;; ------------------ +(defn migrating? [] + (-> (flags/storage-migration) :useLocationId? not)) + (defn ->legacy-object-key [app-id filename] (str app-id "/" filename)) @@ -16,13 +20,11 @@ (let [[_app-id & path] (string/split object-key #"/")] (string/join "/" path))) -;; S3 path manipulation -;; ---------------------- (defn filename->bin ^long [^String filename] (mod (Math/abs (.hashCode filename)) 10)) -(defn ->object-key +(defn ->path-object-key "We prefix objects with an app id and bin. Combined with a filename this gives us our key for each object." [app-id filename] @@ -32,6 +34,25 @@ filename)] (str app-id "/" bin "/" fname))) +(defn path-object-key->path + "Extract path from our S3 object keys" + [object-key] + (let [[_app-id _bin & path] (string/split object-key #"/")] + (string/join "/" path))) + +;; S3 path manipulation +;; ---------------------- +(defn location-id->bin + "We add a bin to the location id to scale S3 performance + See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html" + ^long [^String location-id] + (mod (Math/abs (.hashCode location-id)) 10)) + +(defn ->object-key + "Object keys have the shape of app-id/bin/location-id" + [app-id ^String location-id] + (str app-id "/" (location-id->bin location-id) "/" location-id)) + (defn object-key->app-id "Extract app-id from our S3 object keys" [object-key] @@ -42,68 +63,80 @@ [object-key] (second (string/split object-key #"/"))) -(defn object-key->path - "Extract path from our S3 object keys" +(defn object-key->location-id + "Extract location-id from our S3 object keys" [object-key] - (let [[_app-id _bin & path] (string/split object-key #"/")] - (string/join "/" path))) + (last (string/split object-key #"/"))) ;; Instant <> S3 integration ;; ---------------------- -(defn upload-file-to-s3 [{:keys [app-id path] :as ctx} file] +(defn upload-file-to-s3 [{:keys [app-id path location-id] :as ctx} file] (when (not (instance? java.io.InputStream file)) (throw (Exception. "Unsupported file format"))) - (let [migration? (-> (flags/storage-migration) :disableLegacy? not)] - (if migration? - (let [baos (java.io.ByteArrayOutputStream.) - _ (io/copy file baos) - bytes (.toByteArray baos) - ctx* (assoc ctx :object-key (->object-key app-id path)) - ctx-legacy* (assoc ctx :object-key (->legacy-object-key app-id path))] - (s3-util/upload-stream-to-s3 ctx-legacy* (io/input-stream bytes)) - (s3-util/upload-stream-to-s3 ctx* (io/input-stream bytes))) - (let [ctx* (assoc ctx :object-key (->object-key app-id path))] - (s3-util/upload-stream-to-s3 ctx* file))))) - -(defn format-object [{:keys [key object-metadata]}] + (if (migrating?) + (let [baos (java.io.ByteArrayOutputStream.) + _ (io/copy file baos) + bytes (.toByteArray baos) + ctx* (assoc ctx :object-key (->object-key app-id location-id)) + ctx-legacy* (assoc ctx :object-key (->path-object-key app-id path))] + (s3-util/upload-stream-to-s3 ctx-legacy* (io/input-stream bytes)) + (s3-util/upload-stream-to-s3 ctx* (io/input-stream bytes))) + (let [ctx* (assoc ctx :object-key (->object-key app-id location-id))] + (s3-util/upload-stream-to-s3 ctx* file)))) + +(defn format-object [{:keys [object-metadata]}] (-> object-metadata (select-keys [:content-disposition :content-type :content-length :etag]) (assoc :size (:content-length object-metadata) - :last-modified (-> object-metadata :last-modified .toEpochMilli) - :path (object-key->path key)))) + :last-modified (-> object-metadata :last-modified .toEpochMilli)))) (defn get-object-metadata - ([app-id path] (get-object-metadata s3-util/default-bucket app-id path)) - ([bucket-name app-id path] - (let [object-key (->object-key app-id path)] + ([app-id location-id] (get-object-metadata s3-util/default-bucket app-id location-id)) + ([bucket-name app-id location-id] + (let [object-key (->object-key app-id location-id)] (format-object (s3-util/head-object bucket-name object-key))))) -(defn delete-file! [app-id filename] - (let [object-key (->object-key app-id filename)] - (s3-util/delete-object object-key))) - -(defn bulk-delete-files! [app-id filenames] - (let [keys (mapv (fn [filename] (->object-key app-id filename)) filenames)] - (s3-util/delete-objects-paginated keys))) - -(defn create-legacy-signed-download-url! [app-id filename] +(defn delete-file! [app-id path location-id] + (when (migrating?) + (s3-util/delete-object (->path-object-key app-id path))) + (when location-id + (s3-util/delete-object (->object-key app-id location-id)))) + +(defn bulk-delete-files! [app-id paths location-ids] + (when (migrating?) + (let [path-keys (mapv + (fn [path] (->path-object-key app-id path)) + paths)] + (s3-util/delete-objects-paginated path-keys))) + (let [location-keys (mapv + (fn [location-id] (->object-key app-id location-id)) + location-ids)] + (s3-util/delete-objects-paginated location-keys))) + +(defn path-url [app-id filename] (let [duration (Duration/ofDays 7) - object-key (->legacy-object-key app-id filename)] + object-key (->path-object-key app-id filename)] (str (s3-util/generate-presigned-url {:method :get :bucket-name s3-util/default-bucket :key object-key :duration duration})))) -(defn create-signed-download-url! [app-id filename] +(defn location-id-url [app-id location-id] (let [duration (Duration/ofDays 7) - object-key (->object-key app-id filename)] + object-key (->object-key app-id location-id)] (str (s3-util/generate-presigned-url {:method :get :bucket-name s3-util/default-bucket :key object-key :duration duration})))) +(defn create-signed-download-url! [app-id path location-id] + (if (migrating?) + (path-url app-id path) + (when location-id + (location-id-url app-id location-id)))) + ;; S3 Usage Metrics ;; These functions calculate usage by talking to S3 directly. We can use these ;; for debugging whenever we suspect that our usage metrics based on triples diff --git a/server/src/instant/system_catalog.clj b/server/src/instant/system_catalog.clj index e6e5c7fbd..1c850a624 100644 --- a/server/src/instant/system_catalog.clj +++ b/server/src/instant/system_catalog.clj @@ -67,6 +67,7 @@ "size" "size" "content-type" "c-type" "content-disposition" "cdisp" + "location-id" "lid" "key-version" "kv"}) (def shortcodes-label (map-invert label-shortcodes)) @@ -275,7 +276,7 @@ :unique? true :index? true) (make-attr "$files" "path" - :unique? false + :unique? true :index? true :checked-data-type :string) (make-attr "$files" "size" @@ -290,6 +291,10 @@ :unique? false :index? true :checked-data-type :string) + (make-attr "$files" "location-id" + :unique? true + :index? true + :checked-data-type :string) (make-attr "$files" "key-version" :unique? false :index? false diff --git a/server/src/instant/system_catalog_ops.clj b/server/src/instant/system_catalog_ops.clj index 4cdc28d17..8895072f1 100644 --- a/server/src/instant/system_catalog_ops.clj +++ b/server/src/instant/system_catalog_ops.clj @@ -65,11 +65,12 @@ (defn delete-entity! "Deletes and returns the deleted entity (if it was deleted)." - [tx-conn attrs app-id etype lookup] + [tx-conn attrs app-id etype lookup opts] (some->> (tx/transact-without-tx-conn! tx-conn attrs app-id - [[:delete-entity lookup etype]]) + [[:delete-entity lookup etype]] + opts) :results :delete-entity seq @@ -79,6 +80,27 @@ :triples/created_at)) (triples->db-format app-id attrs etype))) +(defn delete-entities! + "Deletes and returns entities that were deleted." + [tx-conn attrs app-id etype lookups opts] + (some->> (tx/transact-without-tx-conn! tx-conn + attrs + app-id + (mapv (fn [lookup] + [:delete-entity lookup etype]) + lookups) + opts) + :results + :delete-entity + seq + (map (juxt :triples/entity_id + :triples/attr_id + :triples/value + :triples/created_at)) + (group-by first) + vals + (map #(triples->db-format app-id attrs etype %)))) + (defn collect-iql-result ([iql-res] (collect-iql-result {:symbol-values {} @@ -160,13 +182,23 @@ :transact! (fn ([tx-steps] - (tx/transact-without-tx-conn! tx-conn attrs app-id tx-steps)) + (tx/transact-without-tx-conn! tx-conn attrs app-id tx-steps {})) ([tx-steps opts] (tx/transact-without-tx-conn! tx-conn attrs app-id tx-steps opts))) :delete-entity! - (fn [lookup] - (delete-entity! tx-conn attrs app-id etype lookup)) + (fn + ([lookup] + (delete-entity! tx-conn attrs app-id etype lookup {})) + ([lookup opts] + (delete-entity! tx-conn attrs app-id etype lookup opts))) + + :delete-entities! + (fn + ([lookups] + (delete-entities! tx-conn attrs app-id etype lookups {})) + ([lookups opts] + (delete-entities! tx-conn attrs app-id etype lookups opts))) :get-entity (fn [eid] (get-entity tx-conn app-id attrs etype eid)) diff --git a/server/src/instant/util/s3.clj b/server/src/instant/util/s3.clj index f8669e651..971dabb6c 100644 --- a/server/src/instant/util/s3.clj +++ b/server/src/instant/util/s3.clj @@ -10,7 +10,9 @@ BlockingInputStreamAsyncRequestBody) (software.amazon.awssdk.services.s3 S3AsyncClient S3Client) - (software.amazon.awssdk.services.s3.model Delete + (software.amazon.awssdk.services.s3.model CopyObjectRequest + CopyObjectResponse + Delete DeleteObjectRequest DeleteObjectsRequest GetObjectRequest @@ -22,7 +24,8 @@ PutObjectRequest S3Object) (software.amazon.awssdk.services.s3.presigner S3Presigner) - (software.amazon.awssdk.services.s3.presigner.model GetObjectPresignRequest))) + (software.amazon.awssdk.services.s3.presigner.model GetObjectPresignRequest + PutObjectPresignRequest))) (set! *warn-on-reflection* true) @@ -55,17 +58,18 @@ (default-s3-client))))) (def presigner* (delay (-> (S3Presigner/builder) - (.s3Client @signer-s3-client*) - (.build)))) + (.s3Client @signer-s3-client*) + (.build)))) (defn presigner ^S3Presigner [] @presigner*) (defn list-objects ([opts] (list-objects default-bucket opts)) - ([bucket-name {:keys [continuation-token]}] + ([bucket-name {:keys [continuation-token prefix]}] (let [^ListObjectsV2Request req (cond-> (ListObjectsV2Request/builder) true (.bucket bucket-name) + prefix (.prefix prefix) continuation-token (.continuationToken continuation-token) true (.build)) ^ListObjectsV2Response resp (.listObjectsV2 (default-s3-client) req)] @@ -82,6 +86,19 @@ (.contents resp)) :next-continuation-token (.nextContinuationToken resp)}))) +(defn list-all-objects + ([opts] (list-all-objects default-bucket opts)) + ([bucket-name opts] + (loop [all-objects [] + continuation-token nil] + (let [page-opts (cond-> opts + continuation-token (assoc :continuation-token continuation-token)) + {:keys [object-summaries next-continuation-token truncated?]} + (list-objects bucket-name page-opts)] + (if truncated? + (recur (into all-objects object-summaries) next-continuation-token) + (into all-objects object-summaries)))))) + (defn head-object ([object-key] (head-object default-bucket object-key)) ([bucket-name object-key] @@ -95,9 +112,24 @@ :object-metadata {:content-disposition (.contentDisposition resp) :content-type (.contentType resp) :content-length (.contentLength resp) + :version-id (.versionId resp) :etag (.eTag resp) :last-modified (.lastModified resp)}}))) +(defn copy-object + [{:keys [source-bucket-name + destination-bucket-name + source-key + destination-key]}] + (let [^CopyObjectRequest req (-> (CopyObjectRequest/builder) + (.sourceBucket source-bucket-name) + (.sourceKey source-key) + (.destinationBucket destination-bucket-name) + (.destinationKey destination-key) + (.build)) + ^CopyObjectResponse resp (.copyObject (default-s3-client) req)] + resp)) + (defn delete-object ([object-key] (delete-object default-bucket object-key)) ([bucket-name object-key] @@ -134,10 +166,10 @@ (->> chunks (mapcat #(delete-objects bucket-name (vec %))))))) -(defn generate-presigned-url +(defn generate-presigned-url-get ([{:keys [method bucket-name key ^Duration duration]}] (assert (= :get method) - "presigned urls are only implemented for :get requests") + "get presigned urls are only implemented for :get requests") (let [^GetObjectRequest obj-request (-> (GetObjectRequest/builder) (.bucket bucket-name) (.key key) @@ -151,6 +183,30 @@ (.url) (.toExternalForm))))) +(defn generate-presigned-url-put + ([{:keys [method bucket-name key ^Duration duration]}] + (assert (= :put method) + "put presigned urls are only implemented for :put requests") + (let [^PutObjectRequest obj-request (-> (PutObjectRequest/builder) + (.bucket bucket-name) + (.key key) + (.build)) + ^PutObjectPresignRequest signer-request (-> (PutObjectPresignRequest/builder) + (.signatureDuration duration) + (.putObjectRequest obj-request) + (.build))] + (-> (presigner) + (.presignPutObject signer-request) + (.url) + (.toExternalForm))))) + +(defn generate-presigned-url + ([{:keys [method] :as opts}] + (case method + :get (generate-presigned-url-get opts) + :put (generate-presigned-url-put opts) + (throw (ex-info "Unsupported method for presigned url" {:method method}))))) + (defn- make-s3-put-opts [bucket-name {:keys [object-key content-type content-disposition]} file-opts] (merge diff --git a/server/src/instant/util/uuid.clj b/server/src/instant/util/uuid.clj index 83e53243b..628b35408 100644 --- a/server/src/instant/util/uuid.clj +++ b/server/src/instant/util/uuid.clj @@ -2,6 +2,7 @@ (:refer-clojure :exclude [parse-uuid]) (:import (java.util UUID) + (java.security MessageDigest) (java.nio ByteBuffer)) (:require [clojure.string :as string])) @@ -18,6 +19,13 @@ (string? x) (parse-uuid (string/trim x)) :else nil)) +(defn str->uuid + "Convert a string to a deterministic UUID using SHA-256" + [^String s] + (let [md (MessageDigest/getInstance "SHA-256") + bytes (.digest md (.getBytes s))] + (UUID/nameUUIDFromBytes bytes))) + (defn ->bytes "Converts a java.util.UUID into a byte array" ^bytes [^UUID uuid]