From 3be4c891fafa6ec61ce8747e7ab7ec9bbeb6cbd3 Mon Sep 17 00:00:00 2001 From: Ryan Sundberg Date: Mon, 23 Apr 2018 19:23:20 -0700 Subject: [PATCH 1/4] Implement delete-q! function to delete a single queue --- project.clj | 2 +- src/durable_queue.clj | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/project.clj b/project.clj index c049a7e..253abe3 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject factual/durable-queue "0.1.6" +(defproject factual/durable-queue "0.1.7-SNAPSHOT" :description "a in-process task-queue that is backed by disk." :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} diff --git a/src/durable_queue.clj b/src/durable_queue.clj index d468a38..ae424f0 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -432,6 +432,8 @@ (^:private mark-retry! [_ q-name]) (delete! [_] "Deletes all files associated with the queues.") + (delete-q! [_ q-name] + "Deletes all files associated with a queue.") (stats [_] "Returns a map of queue names onto information about the immediate state of the queue.") (fsync [_] @@ -489,9 +491,7 @@ (.mkdirs (io/file directory)) - (let [ - - queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) + (let [queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) queue-name->files (directory->queue-name->slab-files directory) ;; core state stores @@ -624,6 +624,18 @@ (unmap s) (delete-slab s))) + (delete-q! [this q-name] + (locking this + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats assoc q-name nil) + (swap! queue-name->slabs assoc q-name nil) + ;(let [reset-slab (create-new-slab q-name)] + (swap! queue-name->current-slab assoc q-name nil)))) + (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] (sync! slab))) From 80346da39c2a0600fb7ab4f6ea30b50d24c65aba Mon Sep 17 00:00:00 2001 From: Ryan Sundberg Date: Mon, 23 Apr 2018 19:26:37 -0700 Subject: [PATCH 2/4] Rm dead code --- src/durable_queue.clj | 1 - 1 file changed, 1 deletion(-) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index ae424f0..7d317b6 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -633,7 +633,6 @@ (.clear (queue q-name)) (swap! queue-name->stats assoc q-name nil) (swap! queue-name->slabs assoc q-name nil) - ;(let [reset-slab (create-new-slab q-name)] (swap! queue-name->current-slab assoc q-name nil)))) (fsync [_] From 7e9701d70174c0027c17121e1ca4a117ed0f3ec8 Mon Sep 17 00:00:00 2001 From: Ryan Sundberg Date: Mon, 23 Apr 2018 19:31:54 -0700 Subject: [PATCH 3/4] Rm unnecessary lock --- src/durable_queue.clj | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index 7d317b6..17f059b 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -625,15 +625,14 @@ (delete-slab s))) (delete-q! [this q-name] - (locking this - (let [q-name (munge (name q-name))] - (doseq [s (get @queue-name->slabs q-name)] - (unmap s) - (delete-slab s)) - (.clear (queue q-name)) - (swap! queue-name->stats assoc q-name nil) - (swap! queue-name->slabs assoc q-name nil) - (swap! queue-name->current-slab assoc q-name nil)))) + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats assoc q-name nil) + (swap! queue-name->slabs assoc q-name nil) + (swap! queue-name->current-slab assoc q-name nil))) (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] From f0f3c90ecee27352be9d41ad0aedb869c972bcd8 Mon Sep 17 00:00:00 2001 From: Ryan Sundberg Date: Wed, 25 Apr 2018 13:18:10 -0700 Subject: [PATCH 4/4] Drop slab data on initialization that does not fit when slab size > max-queue-size --- src/durable_queue.clj | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/durable_queue.clj b/src/durable_queue.clj index 17f059b..dd4d4bb 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -590,12 +590,13 @@ (swap! empty-slabs conj slab)) (do - (doseq [task tasks] - (status! task :incomplete) - (when-not (.offer q' task) - (throw - (IllegalArgumentException. - "'max-queue-size' insufficient to hold existing tasks.")))) + (loop [tasks tasks] + (when-let [task (first tasks)] + (status! task :incomplete) + ; Continue loading until data doesn't fit into the queue. + ; Drop data in slabs that overflows queue size. + (when (.offer q' task) + (recur (rest tasks))))) (unmap slab))))) (let [^AtomicLong counter (get-in @queue-name->stats [q :enqueued])]