Skip to content

Commit

Permalink
added base-flow-id to tracking atoms
Browse files Browse the repository at this point in the history
helps lessen confusion when flow-ids auto-increment in a schedule scenario
  • Loading branch information
ryrobes committed Dec 22, 2023
1 parent a8eb0be commit 4d2eee1
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/flowmaps/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@

(declare flow) ;; so we can call it in (process ..) even though it's still undefined at this point

(defn- process [flow-id connections input-chs output-chs ff fp block-function opts-map done-ch]
(defn- process [flow-id base-flow-id connections input-chs output-chs ff fp block-function opts-map done-ch]
(let [web-push? (true? @web/websocket?)
{:keys [debug? debux? close-on-done?]} opts-map
pp (if debug? ut/ppln ut/pplno) ;; TODO, grody
Expand Down Expand Up @@ -255,6 +255,7 @@
{:path the-path ;; PRE RECUR HISTORY PUSH.
:type :channel
:channel [data-from ff]
:base-flow-id (str base-flow-id)
:dest ff
:start start
:end (System/currentTimeMillis)
Expand Down Expand Up @@ -455,6 +456,7 @@
:path the-path
:value (ut/limited (value-only result) flow-id)
:type :function
:base-flow-id (str base-flow-id)
:dest ff
:channel [ff]
:dbgn (str dbgn-output)
Expand Down Expand Up @@ -542,6 +544,7 @@
;; procedural clean up - TODO when DB
(let [flow-id (or (get opts-map :flow-id) (ut/generate-name))
increment-id? (get opts-map :increment-id? true)
base-flow-id flow-id ;; saving for later before below mutation
flow-id (if increment-id? ;(not (= flow-id "live-scratch-flow"))
(str flow-id "-" (count (filter #(cstr/starts-with? % flow-id) (keys @db/channel-history))))
flow-id)] ;; don't number the scratch-flow iterations...
Expand Down Expand Up @@ -711,6 +714,7 @@
(doseq [c connections] ;;; "boot" channels into history even if they never get used...
(swap! db/channel-history update flow-id conj {:path [:creating-channels :*]
:channel c
:base-flow-id (str base-flow-id)
:start (System/currentTimeMillis)
:end (System/currentTimeMillis)
:value nil
Expand Down Expand Up @@ -752,6 +756,7 @@
(swap! db/fn-history assoc flow-id (conj (get @db/fn-history flow-id []) {:block from :from :static
:path [:from :static from]
:value (ut/limited from-val flow-id)
:base-flow-id (str base-flow-id)
:type :function
:dest from
:channel [from]
Expand Down Expand Up @@ -788,7 +793,7 @@
(swap! started conj from)
(async/thread
(pp ["In thread for " from " reading from channel " in-chans])
(process flow-id connections in-chans out-chans from fp from-val opts-map done-ch)))
(process flow-id base-flow-id connections in-chans out-chans from fp from-val opts-map done-ch)))

(when has-starter? ;; 95% dupe code from above - refactor TODO - used to SEED an unrun function into the flow
(let [;text-input? false ;(true? (some #(= :text-input %) (flatten [from-val])))
Expand All @@ -803,6 +808,7 @@
(swap! db/fn-history assoc flow-id (conj (get @db/fn-history flow-id []) {:block from :from :starter
:path [:from :starter from]
:value (ut/limited from-val flow-id)
:base-flow-id (str base-flow-id)
:type :function
:dest from
:channel [from]
Expand Down

0 comments on commit 4d2eee1

Please sign in to comment.