Skip to content

Go through all the open issues and PRs and apply their changes #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ pom.xml.asc
/.nrepl-port
.DS_Store
/doc
push
push
*.iml
.idea/
57 changes: 37 additions & 20 deletions src/durable_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@
(with-buffer [buf slab]
(reset! status s)
(.put buf (p/+ offset 1)
(case s
:incomplete 0
:in-progress 1
:complete 2))
(byte
(case s
:incomplete 0
:in-progress 1
:complete 2)))
(invalidate slab (p/+ offset 1) 1)
nil)))

Expand All @@ -190,9 +191,9 @@
(fn []
(with-buffer [buf slab]
(let [^ByteBuffer buf (-> buf
(.position offset)
(.position ^Long offset)
^ByteBuffer
(.limit (+ offset len))
(.limit ^Long (+ offset len))
.slice)
checksum' (.getLong buf 2)
ary (bs/to-byte-array (.position buf header-size))]
Expand Down Expand Up @@ -303,17 +304,17 @@
(let [ary (nippy/freeze descriptor)
cnt (count ary)
pos @position
^ByteBuffer buf (.position buf pos)]
^ByteBuffer buf (.position buf ^Long pos)]

(when (> (.remaining buf) (+ (count ary) header-size))
;; write to the buffer
(doto buf
(.position pos)
(.position ^Long pos)
(.put (byte 1)) ;; exists
(.put (byte 0)) ;; incomplete
(.putLong (checksum cnt ary))
(.putInt cnt)
(.put ary)
(.put ^bytes ary)
(.put (byte 0))) ;; next doesn't exist

(swap! position + header-size cnt)
Expand Down Expand Up @@ -432,6 +433,8 @@
(^:private mark-retry! [_ q-name])
(delete! [_]
"Deletes all files associated with the queues.")
(delete-q! [_ q-name]
"Deletes all files associated with a queue.")
(stats [_]
"Returns a map of queue names onto information about the immediate state of the queue.")
(fsync [_]
Expand Down Expand Up @@ -463,7 +466,13 @@

fsync-put? - if true, each `put!` will force an fsync. Defaults to true.

fsync-take? - if true, each `take!` will force an fsync. Defaults to false."
fsync-take? - if true, each `take!` will force an fsync. Defaults to false.

fsync-threshold - The maximum number of writes (puts, takes, retries, completes) that
can be performed before an fsync is performed.

fsync-interval - The maximum amount of time, in milliseconds, that can elapse before
an fsync is performed. "
([directory]
(queues directory nil))
([directory
Expand All @@ -489,9 +498,7 @@

(.mkdirs (io/file directory))

(let [

queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size))))
(let [queue (memoize (fn [_] (LinkedBlockingQueue. (int max-queue-size))))
queue-name->files (directory->queue-name->slab-files directory)

;; core state stores
Expand Down Expand Up @@ -561,10 +568,10 @@
(while (.get ref)
(when-let [q (.get ref)]
(try
(let [start (System/currentTimeMillis)]
(let [start (System/nanoTime)]
(fsync q)
(let [end (System/currentTimeMillis)]
(Thread/sleep (max 0 (- fsync-interval (- end start))))))
(let [end (System/nanoTime)]
(Thread/sleep (max 0 (- (* 1000000 fsync-interval) (- end start))))))
(catch Throwable e
)))))))

Expand Down Expand Up @@ -624,6 +631,16 @@
(unmap s)
(delete-slab s)))

(delete-q! [this q-name]
(let [q-name (munge (name q-name))]
(doseq [s (get @queue-name->slabs q-name)]
(unmap s)
(delete-slab s))
(.clear (queue q-name))
(swap! queue-name->stats assoc q-name nil)
(swap! queue-name->slabs assoc q-name nil)
(swap! queue-name->current-slab assoc q-name nil)))

(fsync [_]
(doseq [slab (->> @queue-name->slabs vals (apply concat))]
(sync! slab)))
Expand Down Expand Up @@ -758,16 +775,16 @@
"Returns a lazy sequence of tasks that can be consumed in `interval` milliseconds. This will
terminate after that time has elapsed, even if there are still tasks immediately available."
[qs q-name interval]
(let [now (System/currentTimeMillis)]
(let [now (System/nanoTime)]
(lazy-seq
(let [now' (System/currentTimeMillis)
remaining (- interval (- now' now))]
(let [now' (System/nanoTime)
remaining (- (* 1000000 interval) (- now' now))]
(when (pos? remaining)
(let [task (take! qs q-name remaining ::none)]
(when-not (= ::none task)
(cons
task
(interval-task-seq qs q-name (- interval (- (System/currentTimeMillis) now)))))))))))
(interval-task-seq qs q-name (- (* 1000000 interval) (- (System/nanoTime) now)))))))))))

(defn complete!
"Marks a task as complete."
Expand Down