diff --git a/src/manifold/stream.clj b/src/manifold/stream.clj index 3a3d0978..cd70463f 100644 --- a/src/manifold/stream.clj +++ b/src/manifold/stream.clj @@ -1154,23 +1154,31 @@ out (sliding-stream n in)] (splice in out))) ([n source] - (let [sink (stream n)] - (connect-via - source - (fn [val] - (d/loop [] - (d/chain - (try-put! sink val 0 ::timeout) - (fn [put-result] - (case put-result - true true - false false - ::timeout (d/chain (take! sink) - (fn [_] (d/recur)))))))) - sink - {:upstream? true - :downstream? true}) - sink))) + (let [buf (stream) + sink (stream n)] + (connect-via-proxy source buf sink {:description {:op "sliding_stream"}}) + (d/loop [] + (d/chain' + (take! buf ::none) + (fn [val] + (if (identical? val ::none) + (close! sink) + (d/chain' + (d/loop [] + (d/chain' + (try-put! sink val 0 ::timeout) + (fn [put-result] + (case put-result + (true false) put-result + ::timeout + (d/chain' + (take! sink ::empty) + (fn [x] + (when-not (identical? x ::empty) + (d/recur)))))))) + (fn [_] + (d/recur))))))) + (->sink sink)))) ;;; diff --git a/test/manifold/stream_test.clj b/test/manifold/stream_test.clj index ed66f658..4501e73a 100644 --- a/test/manifold/stream_test.clj +++ b/test/manifold/stream_test.clj @@ -467,17 +467,25 @@ (deftest test-window-streams (testing "dropping-stream" - (let [s (s/->source (range 11)) + (let [s (s/->source (range 11)) dropping-s (s/dropping-stream 10 s)] (is (= (range 10) (s/stream->seq dropping-s))))) (testing "sliding-stream" - (let [s (s/->source (range 11)) - sliding-s (s/sliding-stream 10 s)] - (is (= (range 1 11) - (s/stream->seq sliding-s)))))) - + (let [in (s/stream) + sliding-s (s/sliding-stream 3 in)] + (testing "passthrough within buffer size" + @(s/put! in 1) + (is (= 1 @(s/try-take! sliding-s 0)))) + (testing "discards oldest elements when blocked" + @(s/put-all! in [1 2 3 4]) + (is (= 2 @(s/try-take! sliding-s 0))) + (is (= 3 @(s/try-take! sliding-s 0))) + (is (= 4 @(s/try-take! sliding-s 0)))) + (testing "propagates closes" + (s/close! in) + (is (= ::closed @(s/take! sliding-s ::closed))))))) ;;; (deftest ^:stress stress-buffered-stream