diff --git a/project.clj b/project.clj index c049a7e..253abe3 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject factual/durable-queue "0.1.6" +(defproject factual/durable-queue "0.1.7-SNAPSHOT" :description "a in-process task-queue that is backed by disk." :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} diff --git a/src/durable_queue.clj b/src/durable_queue.clj index d468a38..dd4d4bb 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -432,6 +432,8 @@ (^:private mark-retry! [_ q-name]) (delete! [_] "Deletes all files associated with the queues.") + (delete-q! [_ q-name] + "Deletes all files associated with a queue.") (stats [_] "Returns a map of queue names onto information about the immediate state of the queue.") (fsync [_] @@ -489,9 +491,7 @@ (.mkdirs (io/file directory)) - (let [ - - queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) + (let [queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) queue-name->files (directory->queue-name->slab-files directory) ;; core state stores @@ -590,12 +590,13 @@ (swap! empty-slabs conj slab)) (do - (doseq [task tasks] - (status! task :incomplete) - (when-not (.offer q' task) - (throw - (IllegalArgumentException. - "'max-queue-size' insufficient to hold existing tasks.")))) + (loop [tasks tasks] + (when-let [task (first tasks)] + (status! task :incomplete) + ; Continue loading until data doesn't fit into the queue. + ; Drop data in slabs that overflows queue size. + (when (.offer q' task) + (recur (rest tasks))))) (unmap slab))))) (let [^AtomicLong counter (get-in @queue-name->stats [q :enqueued])] @@ -624,6 +625,16 @@ (unmap s) (delete-slab s))) + (delete-q! [this q-name] + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats assoc q-name nil) + (swap! queue-name->slabs assoc q-name nil) + (swap! queue-name->current-slab assoc q-name nil))) + (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] (sync! slab)))