diff --git a/src/flowmaps/core.clj b/src/flowmaps/core.clj index f3234c5..0a31ea8 100644 --- a/src/flowmaps/core.clj +++ b/src/flowmaps/core.clj @@ -313,8 +313,9 @@ (cond (not pre-when?) [:failed-pre-check :w-input pre-in] ;; failed pre-when input check - send error message (and fully-resolved? is-subflow?) ;(and is-subflow? fully-resolved?) (let [a (atom nil)] ;; use temp atom to hold subflow results and treat like regular fn - (flow block-function {:flow-id (str flow-id "/" (ut/unkeyword ff)) - :debug? debug?} a ;output-chs + (flow block-function (merge (select-keys opts-map [:increment-id?]) + {:flow-id (str flow-id "/" (ut/unkeyword ff)) + :debug? debug?}) a ;output-chs subflow-overrides) (while (nil? @a) (Thread/sleep 100)) @a) @@ -329,8 +330,9 @@ expression-dbgn (delay (try ;; testing - to be refactored away shortly w macros (cond (not pre-when?) [:failed-pre-check :w-input pre-in] ;; failed pre-when input check is-subflow? (let [a (atom nil)] ;; use temp atom to hold subflow results and treat like regular fn - (flow block-function {:flow-id (str flow-id "/" (ut/unkeyword ff)) - :debug? debug?} a ;output-chs + (flow block-function (merge (select-keys opts-map [:increment-id?]) + {:flow-id (str flow-id "/" (ut/unkeyword ff)) + :debug? debug?}) a ;output-chs subflow-overrides) (while (nil? @a) (Thread/sleep 100)) @a) @@ -564,7 +566,7 @@ ;(get opts-map :close? false)) ;; - close channels opt? (doseq [[k c] (get @db/channels-atom flow-id)] (try #_{:clj-kondo/ignore [:redundant-do]} - (do (ut/ppln [:closing-channel k c]) + (do (when (get opts-map :debug? false) (ut/ppln [:closing-channel k c])) (async/close! c)) (catch Exception e (ut/ppln [:error-closing-channel e k c]))))) (try @@ -889,8 +891,11 @@ times (if (and (vector? time-seq1) (keyword? (first time-seq1))) (doall (take 1000 (ut/time-seq time-seq1))) time-seq1) + times-unlimited (if (and (vector? time-seq1) (keyword? (first time-seq1))) + (ut/time-seq time-seq1) + time-seq1) ch (chime/chime-at - times ;; [] chime time seq, https://github.com/jarohen/chime#recurring-schedules + times-unlimited ;; [] chime time seq, https://github.com/jarohen/chime#recurring-schedules (fn [time] (let [opts (merge opts {:schedule-started (str time)})] (flow flowmap opts chan-out override)))