Skip to content

Commit

Permalink
doc fix, close-on-done temp fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ryrobes committed Dec 18, 2023
1 parent 981df3c commit 3a5ae8a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 42 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ Flow-maps also provides a rabbit-ui visualizer / debugger to help UNDERSTAND and
* Start up the Rabbit web-server and web-sockets
* (only needed for dev / prod can be headless)
```clojure
(fweb/start!) ;; starts up the rabbit-ui viewer at http://localhost:8888/
(fweb/start!) ;; starts up the rabbit-ui viewer at http://localhost:8080/
```
* Open up the URL - but just leave it for now - http://localhost:8888/
* Open up the URL - but just leave it for now - http://localhost:8080/

![rabbit web ui](https://app.rabbitremix.com/ready-to-flow.png)

Expand Down Expand Up @@ -677,7 +677,7 @@ Simple sample usage:
[:comp2 :adder/in2]
[:adder :simple-plus-10]]}) ;; but use the whole block as output

(fweb/start!) ;; starts up the rabbit-ui viewer at http://localhost:8888/
(fweb/start!) ;; starts up the rabbit-ui viewer at http://localhost:8080/
(fm/flow first-flow) ;; starts the flow, look upon ye rabbit and rejoice!
```

Expand Down Expand Up @@ -797,7 +797,7 @@ A slight ramp up in complexity from the above flow. Contains a loop that exists
:display-val {:x 768 :y 662 :h 179 :w 320}
:adder/in2 {:x 430 :y 430 :h 175 :w 232}}})

(fweb/start!) ;; starts up the rabbit-ui viewer at http://localhost:8888/
(fweb/start!) ;; starts up the rabbit-ui viewer at http://localhost:8080/
(fm/flow looping-flow) ;; starts the flow, look upon ye rabbit and rejoice!

```
Expand Down Expand Up @@ -832,7 +832,7 @@ Simple options example:
[:* :done]]}) ;; note the :done meta block, so it knows what to "return"

(def res (atom nil)) ;; create an atom to hold the value (could also use a channel)
(fweb/start!) ;; starts up a rabbit at http://localhost:8888/
(fweb/start!) ;; starts up a rabbit at http://localhost:8080/

(fm/flow my-flow {:debug? false} res) ;; adding res atom as the eventual receiver of the final value / signal

Expand Down Expand Up @@ -923,7 +923,7 @@ When the web ui is first booted up via...
```clojure
(fweb/start!) ;; flowmaps.web/start! or stop!
;; returns:
[:*web "starting web ui @ http://localhost:8888" "🐇"]
[:*web "starting web ui @ http://localhost:8080" "🐇"]
```
...you will see a blank canvas that won't show anything until a flow is run. Obviously this data is not pushed out unless rabbit is running in your REPL or application.

Expand Down
78 changes: 48 additions & 30 deletions src/flowmaps/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -139,27 +139,29 @@
(flatten v)))})))

(defn- end-chain-msg [flow-id fp ff result the-path debug?] ;; TODO better calc of pathfinding to derive natural end points w/o :done blocks
(let [fp-vals (vec (for [f the-path] [f (get-in @db/results-atom [flow-id f])]))
res-paths (distinct (get @db/resolved-paths flow-id))
chains (count res-paths)
last-in-chain? (some #(= the-path %) res-paths)
chains-run (+ (count (distinct @db/chains-completed)) 1)
all-chains-done? (= chains chains-run)]
(when debug?
(ut/ppln [the-path :chain-done! ff :done-hop-or-empty :res-paths res-paths]))

(when last-in-chain?

(do
(when debug?
(ut/ppln [chains-run :chain-done :done-hop ff :steps (count fp-vals) :final-result result
:value-path fp-vals chains-run :of chains :flow-chains-completed]))
(swap! db/chains-completed conj the-path))

(when (and all-chains-done? last-in-chain?)
(when debug?
(ut/ppln [:all-chains-done! :resolved-paths (vec (distinct (get @db/resolved-paths flow-id)))
ff :fp fp :results-atom (get @db/results-atom flow-id)]))))))
(try
(let [fp-vals (vec (for [f the-path] [f (get-in @db/results-atom [flow-id f])]))
res-paths (distinct (get @db/resolved-paths flow-id))
chains (count res-paths)
last-in-chain? (some #(= the-path %) res-paths)
;;_ (ut/ppln @db/chains-completed)
chains-run (try (+ (count (distinct @db/chains-completed)) 1) (catch Exception _ 1))
all-chains-done? (= chains chains-run)]
(when debug?
(ut/ppln [the-path :chain-done! ff :done-hop-or-empty :res-paths res-paths]))

(when last-in-chain?
(do
(when debug?
(ut/ppln [chains-run :chain-done :done-hop ff :steps (count fp-vals) :final-result result
:value-path fp-vals chains-run :of chains :flow-chains-completed]))
(swap! db/chains-completed conj the-path))

(when (and all-chains-done? last-in-chain?)
(when debug?
(ut/ppln [:all-chains-done! :resolved-paths (vec (distinct (get @db/resolved-paths flow-id)))
ff :fp fp :results-atom (get @db/results-atom flow-id)])))))
(catch Exception _ nil))) ;; TODO weird bug when close-on-done? is true and we're using an atom as output channel

(defn close-channels! [flow-id]
(doseq [[k c] (get @db/channels-atom flow-id)]
Expand All @@ -170,22 +172,23 @@
(do (swap! db/channels-atom ut/dissoc-in [flow-id k])
(async/close! c)))
(catch Throwable e (do (swap! db/channels-atom assoc-in [flow-id k] c)
(ut/ppln [:error-closing-channel-inner e k c])))) ;; close channel
(ut/ppln [:error-closing-channelS-inner e k c])))) ;; close channel
) ;; remove from "live" channel atom
(catch Throwable e
(do (swap! db/channels-atom assoc-in [flow-id k] c) ;; if failed, keep key val for now
(ut/ppln [:error-closing-channel-outer e k c]))))))
(ut/ppln [:error-closing-channelS-outer e k c]))))))

(defn close-channel! [flow-id channel-id]
;(close-channels! [channel-id])
(let [ch (get-in @db/channels-atom [flow-id channel-id])]
(try (let [ch (get-in @db/channels-atom [flow-id channel-id])]
;(ut/ppln [:closing-channel flow-id channel-id])
(try
(when (not (nil? ch))
(do (swap! db/channels-atom ut/dissoc-in [flow-id channel-id])
(async/close! ch)))
(catch Throwable e (ut/ppln [:error-closing-channel-inner e flow-id channel-id]))) ;; close channel
(swap! db/channels-atom ut/dissoc-in [flow-id channel-id])))
(swap! db/channels-atom ut/dissoc-in [flow-id channel-id]))
(catch Throwable e (ut/ppln [:error-closing-channel-outer e flow-id channel-id]))))

(declare flow) ;; so we can call it in (process ..) even though it's still undefined at this point

Expand Down Expand Up @@ -229,7 +232,7 @@
done-atom? (instance? clojure.lang.Atom done-ch)
channels (+ (count (keys (get @db/channels-atom flow-id)))
(count (keys (get @db/condi-channels-atom flow-id))))]
(cond done-atom? (reset! done-ch data-val)
(cond done-atom? (when (not (= data-val :done)) (reset! done-ch data-val)) ;; TODO, weird bug with close-on-done? + atom output
(and done-channel? multi-ch?) (doseq [d done-ch] (async/>! d data-val))
done-channel? (async/>! done-ch data-val)
:else nil)
Expand Down Expand Up @@ -873,12 +876,27 @@
{:on-finished (fn [] (ut/ppln [:schedule-finished! opts time-seq1]))
:error-handler (fn [e] (ut/ppln [:scheduler-error e]))})))

;; snippets for testing
;; snippets for testing -

;(web/start!) ;; boots the webserver and socket server - TODO, will be a fn to start the REST server by itself w/o dev UI
(web/start!) ;; boots the webserver and socket server - TODO, will be a fn to start the REST server by itself w/o dev UI
;(web/stop!)
;(db/re-init :file "mem-test") ;; persistent atoms (can even change after regular atoms have been used)
;(flow fex/my-network {:flow-id "baloney-space-men-ahoy-mustard"} nil {:comp1 4545 :comp2 2323}) ;; override subflow values
;(db/re-init :file "mem-test") ;; persistent atoms (can even change after regular atoms have been used) - DEPRECATED

(flow fex/my-network {:flow-id "baloney-space-men-ahoy-mustard" :close-on-done? true :debug? false} nil {:comp1 4545 :comp2 2323}) ;; override subflow values

(defn flow-waiter [in-flow]
(let [a (atom nil)]
;(ut/pp (vec (remove nil? [:testing-flow-atom-output])))
(flow in-flow {:debug? false :close-on-done? true} a)
(while (nil? @a)
(Thread/sleep 100)) ;; check the atom every 100 ms
@a))
;; (ut/ppln @db/channels-atom)
(def a (atom nil))
(flow fex/my-network {:debug? false :close-on-done? true} a)

(ut/ppln [:outs (flow-waiter fex/looping-net)])

;(flow fex/looping-net)
;(flow fex/my-network)
;(flow fex/my-network-input)
Expand Down
4 changes: 2 additions & 2 deletions src/flowmaps/rest.clj
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@
[[:pushed flow-id channel-name] value]))))

;; test
;; curl -X POST -s -H "Content-Type: application/edn" -H "Accept: application/edn" -d '{:value 45 :channel [:int1 :adder/in1]}' http://localhost:8000/flow-value-push/odds-and-evens
;; curl -X POST -s -H "Content-Type: application/edn" -H "Accept: application/edn" -d '{:value 44 :channel [:int1 :adder/in1] :return [:display-val :done]}' http://localhost:8000/flow-value-push/odds-and-evens
;; curl -X POST -s -H "Content-Type: application/edn" -H "Accept: application/edn" -d '{:value 45 :channel [:int1 :adder/in1]}' http://localhost:8080/flow-value-push/odds-and-evens
;; curl -X POST -s -H "Content-Type: application/edn" -H "Accept: application/edn" -d '{:value 44 :channel [:int1 :adder/in1] :return [:display-val :done]}' http://localhost:8080/flow-value-push/odds-and-evens

(defn wait-for-event [channel flow-id start-time timeout-ms]
(try
Expand Down
8 changes: 4 additions & 4 deletions src/flowmaps/web.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
:max-threads 50
::http/type :jetty
::http/host "0.0.0.0"
::http/port 8000
::http/port 8080
::http/container-options {:h2c? true
:h2? false
:ssl? false}})
Expand All @@ -63,11 +63,11 @@
(def web-server (atom nil))

(defn create-web-server! []
(ut/ppln [:*web (format "starting web ui @ http://localhost:%d" 8000) "🐇" ])
(ut/ppln [:*web (format "starting web ui @ http://localhost:%d" 8080) "🐇" ])
(reset! web-server (future (http/start runnable-service))))

(defn destroy-web-server! []
(ut/ppln [:*web (format "stopping web server @ %d" 8000)])
(ut/ppln [:*web (format "stopping web server @ %d" 8080)])
(reset! web-server nil))

(defn start! [] ;; start the web server and websocket server
Expand All @@ -86,7 +86,7 @@
(defmethod wl/handle-subscription :external-editing [{:keys [kind client-id]}] ;; default subscription server->client push "queue"...
(let [results (async/chan)]
(async/go-loop []
(async/<! (async/timeout 500)) ;; 600-800 seems ideal
(async/<! (async/timeout 800)) ;; 600-800 seems ideal
(if-let [item (ut/dequeue! queue-atom)]
(when (async/>! results item)
(recur))
Expand Down

0 comments on commit 3a5ae8a

Please sign in to comment.