diff --git a/.gitignore b/.gitignore index 7d70dce..b70b08c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ pom.xml.asc /.nrepl-port .DS_Store /doc -push \ No newline at end of file +push +*.iml +.idea/ \ No newline at end of file diff --git a/src/durable_queue.clj b/src/durable_queue.clj index d468a38..928170d 100644 --- a/src/durable_queue.clj +++ b/src/durable_queue.clj @@ -174,10 +174,11 @@ (with-buffer [buf slab] (reset! status s) (.put buf (p/+ offset 1) - (case s - :incomplete 0 - :in-progress 1 - :complete 2)) + (byte + (case s + :incomplete 0 + :in-progress 1 + :complete 2))) (invalidate slab (p/+ offset 1) 1) nil))) @@ -190,9 +191,9 @@ (fn [] (with-buffer [buf slab] (let [^ByteBuffer buf (-> buf - (.position offset) + (.position ^Long offset) ^ByteBuffer - (.limit (+ offset len)) + (.limit ^Long (+ offset len)) .slice) checksum' (.getLong buf 2) ary (bs/to-byte-array (.position buf header-size))] @@ -303,17 +304,17 @@ (let [ary (nippy/freeze descriptor) cnt (count ary) pos @position - ^ByteBuffer buf (.position buf pos)] + ^ByteBuffer buf (.position buf ^Long pos)] (when (> (.remaining buf) (+ (count ary) header-size)) ;; write to the buffer (doto buf - (.position pos) + (.position ^Long pos) (.put (byte 1)) ;; exists (.put (byte 0)) ;; incomplete (.putLong (checksum cnt ary)) (.putInt cnt) - (.put ary) + (.put ^bytes ary) (.put (byte 0))) ;; next doesn't exist (swap! position + header-size cnt) @@ -432,6 +433,8 @@ (^:private mark-retry! [_ q-name]) (delete! [_] "Deletes all files associated with the queues.") + (delete-q! [_ q-name] + "Deletes all files associated with a queue.") (stats [_] "Returns a map of queue names onto information about the immediate state of the queue.") (fsync [_] @@ -463,7 +466,13 @@ fsync-put? - if true, each `put!` will force an fsync. Defaults to true. - fsync-take? - if true, each `take!` will force an fsync. Defaults to false." + fsync-take? - if true, each `take!` will force an fsync. Defaults to false. + + fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that + can be performed before an fsync is performed. + + fsync-interval - The maximum amount of time, in milliseconds, that can elapse before + an fsync is performed. " ([directory] (queues directory nil)) ([directory @@ -489,9 +498,7 @@ (.mkdirs (io/file directory)) - (let [ - - queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) + (let [queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size)))) queue-name->files (directory->queue-name->slab-files directory) ;; core state stores @@ -561,10 +568,10 @@ (while (.get ref) (when-let [q (.get ref)] (try - (let [start (System/currentTimeMillis)] + (let [start (System/nanoTime)] (fsync q) - (let [end (System/currentTimeMillis)] - (Thread/sleep (max 0 (- fsync-interval (- end start)))))) + (let [end (System/nanoTime)] + (Thread/sleep (max 0 (- (* 1000000 fsync-interval) (- end start)))))) (catch Throwable e ))))))) @@ -624,6 +631,16 @@ (unmap s) (delete-slab s))) + (delete-q! [this q-name] + (let [q-name (munge (name q-name))] + (doseq [s (get @queue-name->slabs q-name)] + (unmap s) + (delete-slab s)) + (.clear (queue q-name)) + (swap! queue-name->stats assoc q-name nil) + (swap! queue-name->slabs assoc q-name nil) + (swap! queue-name->current-slab assoc q-name nil))) + (fsync [_] (doseq [slab (->> @queue-name->slabs vals (apply concat))] (sync! slab))) @@ -758,16 +775,16 @@ "Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will terminate after that time has elapsed, even if there are still tasks immediately available." [qs q-name interval] - (let [now (System/currentTimeMillis)] + (let [now (System/nanoTime)] (lazy-seq - (let [now' (System/currentTimeMillis) - remaining (- interval (- now' now))] + (let [now' (System/nanoTime) + remaining (- (* 1000000 interval) (- now' now))] (when (pos? remaining) (let [task (take! qs q-name remaining ::none)] (when-not (= ::none task) (cons task - (interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now))))))))))) + (interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now))))))))))) (defn complete! "Marks a task as complete."