Skip to content

Commit 6b79d3f

Browse files
committed
prep for real concurrency
1 parent e358784 commit 6b79d3f

File tree

7 files changed

+133
-100
lines changed

7 files changed

+133
-100
lines changed

project.clj

+2-5
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
:min-lein-version "2.0.0"
77

88
:source-paths ["src/clojure"]
9-
:java-source-paths ["src/java"]
10-
:javac-options ["-target" "1.8" "-source" "1.8"]
9+
:test-paths ["test" "src/clojure"] ; for picking up unit tests from regular source files not only the tests directory
10+
:javac-options ["-target" "1.8" "-source" "1.8"]
1111

1212
:dependencies [[org.clojure/clojure "1.8.0"]
1313

@@ -47,9 +47,6 @@
4747
[lein-codox "0.10.3"]
4848
[lein-auto "0.1.3"]] ; provides the auto lein command for watching source changes
4949

50-
; this doesn't work yet ― see https://github.com/weavejester/lein-auto/issues/6
51-
; :auto {:default {:paths (:source-paths :java-source-paths :test-paths :java-source-paths "my path")}} ; https://github.com/weavejester/lein-auto#usage
52-
5350
:codox {:metadata {:doc/format :markdown}} ; treat docstrings as codox extended markdown (https://github.com/weavejester/codox/blob/master/example/src/clojure/codox/markdown.clj)
5451

5552
:main process-wrapper.main)

src/clojure/process_wrapper/core.clj

+37-35
Original file line numberDiff line numberDiff line change
@@ -4,70 +4,72 @@
44
[puget.printer :refer [cprint]]
55
[clojure.inspector :as inspect :refer [inspect-tree]]
66
[process-wrapper.util :refer :all]
7+
[process-wrapper.parse :refer :all]
78
[me.raynes.conch.low-level :as sh]
89
[instaparse.core :as insta]))
910

1011

11-
(def ^:private fasttext-output-parser
12-
(insta/parser
13-
"Labels = Label-Prediction (Space Label-Prediction)*
12+
(def worker-status (atom {})) ;; for now, only manages a single worker
1413

15-
Label-Prediction = Label Space Probability
16-
17-
<Label> = <Label-Prefix> Label-Name
18-
<Label-Prefix> = '__label__'
19-
Label-Name = #'^\\S*'
14+
(defn attach [command-and-args]
2015

21-
<Space> = <' '>
16+
" starts and returns a process ready to work with "
2217

23-
<Probability> = Probability-Fraction | Probability-Fraction-Scientific-Notation | '0' | '1'
18+
(let
19+
[conch-object (apply sh/proc command-and-args) ; this is the map returned by `conch/sh`, consisting of the keys: :err :in :out :process
20+
process-object (:process conch-object) ; the java process object
21+
new-worker
22+
{process-object
23+
{:shell-object conch-object
24+
:status :idle}}]
2425

25-
Probability-Fraction = #'0\\.[0-9]+'
26-
Probability-Fraction-Scientific-Notation = #'[0-9]+\\.[0-9]+e-[0-9]+'"))
26+
(swap! worker-status conj new-worker)
2727

28+
new-worker))
2829

29-
(with-test
30-
(defn ^:private fasttext-output-parse [text]
31-
" parses fasttext output returned for a single text object "
32-
(let
33-
[parse-or-error (fasttext-output-parser text)]
34-
(if (insta/failure? parse-or-error)
35-
(do
36-
(println parse-or-error)
37-
(throw (Exception. (str "failed parsing what has been assumed to be fasttext prediction output: " text))))
38-
parse-or-error)))
39-
;; test code
40-
(is (= 1 1)))
30+
(defn ^:private get-worker-state [worker]
31+
(get-in (val (first worker)) [:status]))
4132

33+
(add-watch worker-status :watcher
34+
(fn [watch-name watch-object old new]
35+
(println "watch activated")
36+
#_(do
37+
(println "changed from")
38+
(cprint old-state)
39+
(println "to")
40+
(cprint new-state))))
4241

43-
(defn get-prediction [process input]
42+
(defn get-prediction [worker input]
4443

4544
"pushes input text on stdin, gets the classification for it on stdout"
4645

46+
(swap! worker-status
47+
(fn swap-fn [worker-status]
48+
(update-in worker-status [(key (first worker)) :status] (constantly :working))))
49+
4750
; push the input (should end with a newline)
48-
(sh/feed-from-string process input)
51+
52+
(sh/feed-from-string (get-in (val (first worker)) [:shell-object]) input)
4953

5054
; get the output prediction
5155
(let
5256
[rawparse-or-error
5357
(loop [raw-response (str)]
5458
(let
55-
[byte-read (.read (:out process))]
59+
[byte-read (.read (get-in (val (first worker)) [:shell-object :out]))]
5660
(assert (not= byte-read -1))
5761
(let [as-char (char byte-read)]
5862
(if (= byte-read 10) ; line-feed
5963
raw-response
60-
(recur (str raw-response as-char))))))]
64+
(recur (str raw-response as-char))))))
6165

62-
(fasttext-output-parse rawparse-or-error)))
63-
64-
65-
(defn attach [command-and-args]
66-
67-
" starts and returns a process ready to work with "
66+
parse (fasttext-output-parse rawparse-or-error)]
6867

69-
(apply sh/proc command-and-args))
68+
(swap! worker-status
69+
(fn [process-list]
70+
(update-in process-list [(key (first worker)) :status] (constantly :idle))))
7071

72+
parse))
7173

7274

7375

src/clojure/process_wrapper/main.clj

+2-32
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,6 @@
11
(ns process-wrapper.main
22
(:require
3-
[yada.yada :refer [listener resource as-resource]]
4-
[process-wrapper.core :refer :all]))
5-
3+
[process-wrapper.server :refer :all]))
64

75
(defn -main []
8-
9-
(println "server starting")
10-
11-
(let
12-
[process (attach ["fasttext/fastText/fasttext" "predict-prob" "fasttext/classifier.bin" "-" "5"])
13-
classify
14-
(fn [ctx]
15-
(let [text (str (get-in ctx [:parameters :query :text]) "\n")]
16-
(get-prediction process text)))]
17-
18-
(def yada-server
19-
(listener
20-
["/"
21-
[["status"
22-
(resource
23-
{:produces "text/plain"
24-
:response "Server is up"})]
25-
["predict"
26-
(resource
27-
{:methods
28-
{:get
29-
{:parameters {:query {:text String}}
30-
:produces "text/plain"
31-
:response classify}}})]
32-
[true (as-resource nil)]]]
33-
34-
{:port 3000})))
35-
36-
@(promise)) ;; block forever to keep the yada server alive
6+
(server))

src/clojure/process_wrapper/parse.clj

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
(ns process-wrapper.parse
2+
(:require
3+
[puget.printer :refer [cprint]]
4+
[clojure.inspector :as inspect :refer [inspect-tree]]
5+
[process-wrapper.util :refer :all]
6+
[instaparse.core :as insta]
7+
[clojure.test :refer :all]))
8+
9+
10+
(def ^:private fasttext-output-parser
11+
(insta/parser
12+
"Labels = Scored-Label (Space Scored-Label)*
13+
14+
Scored-Label = Label Space Score
15+
16+
<Label> = <Label-Prefix> Label-Name
17+
<Label-Prefix> = '__label__'
18+
<Label-Name> = #'^\\S*'
19+
20+
<Space> = <' '>
21+
22+
<Score> = Score-Fraction | Score-Fraction-Scientific-Notation | '0' | '1'
23+
24+
<Score-Fraction> = #'0\\.[0-9]+'
25+
<Score-Fraction-Scientific-Notation> = #'[0-9]+\\.[0-9]+e-[0-9]+'"))
26+
27+
28+
(with-test
29+
(defn fasttext-output-parse [text]
30+
" parses fasttext output returned for a single text object "
31+
(let
32+
[parse-or-error (fasttext-output-parser text)]
33+
(if (insta/failure? parse-or-error)
34+
(do
35+
(println parse-or-error)
36+
(throw (Exception. (str "failed parsing what has been assumed to be fasttext prediction output: " text))))
37+
parse-or-error)))
38+
39+
;; tests
40+
(is (=
41+
(fasttext-output-parse "__label__A 1 __label__B 1.95313e-08 __label__C 1.95313e-08 __label__D 1.95313e-08 __label__E 1.95313e-08")
42+
[:Labels
43+
[:Scored-Label "A" "1"]
44+
[:Scored-Label "B" "1.95313e-08"]
45+
[:Scored-Label "C" "1.95313e-08"]
46+
[:Scored-Label "D" "1.95313e-08"]
47+
[:Scored-Label "E" "1.95313e-08"]]))
48+
49+
(is (=
50+
(fasttext-output-parse "__label__A 0.999998")
51+
[:Labels
52+
[:Scored-Label "A" "0.999998"]])))
53+
54+
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
(ns process-wrapper.server
2+
(:require
3+
[yada.yada :refer [listener resource as-resource]]
4+
[process-wrapper.core :refer :all]))
5+
6+
(defn server []
7+
(println "server starting")
8+
9+
(let
10+
[worker (attach ["fasttext/fastText/fasttext" "predict-prob" "fasttext/classifier.bin" "-" "5"])
11+
classify
12+
(fn [ctx]
13+
(let [text (str (get-in ctx [:parameters :query :text]) "\n")]
14+
(get-prediction worker text)))]
15+
16+
(def yada-server
17+
(listener
18+
["/"
19+
[["status"
20+
(resource
21+
{:produces "text/plain"
22+
:response "Server is up"})]
23+
["predict"
24+
(resource
25+
{:methods
26+
{:get
27+
{:parameters {:query {:text String}}
28+
:produces "text/plain"
29+
:response classify}}})]
30+
[true (as-resource nil)]]]
31+
32+
{:port 3000})))
33+
34+
@(promise)) ;; block forever to keep the yada server alive

src/clojure/process_wrapper/util.clj

-21
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,6 @@
3131
(throw (Exception. (str "unexpected type: " (class variable))))))
3232

3333

34-
(with-test
35-
(defn interval-points
36-
[start end number-of-intervals]
37-
{:pre [(number? start) (number? end) (number? number-of-intervals)]}
38-
39-
" returns an ordered list of evenly-spaced interval points
40-
between the start and end numbers, the start and end included.
41-
42-
compared to clojure.core's `range` this is inclusive, numerically stable, and always returns doubles"
43-
44-
(let
45-
[step-size (/ (- end start) number-of-intervals )]
46-
(map
47-
double
48-
(range start (+ step-size end) step-size))))
49-
50-
(do
51-
(assert (= '(0 0.5 1) (interval-points 0 1 2)))
52-
(assert (some #(= % 0.22) (interval-points 0 1 50)))))
53-
54-
5534
(defn abs-distance
5635
[a b]
5736
(let [distance (- a b)]

test/process_wrapper/core_test.clj

+4-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
(ns process-wrapper.core-test
22
(:require [clojure.test :refer :all]
3-
[process-wrapper.core :refer :all]))
3+
[process-wrapper.server :refer :all]))
44

5-
(deftest a-test
6-
(attach ["executable/fastText/fasttext"
7-
"predict-prob"
8-
"executable/classifier.bin"
9-
"-"
10-
"5"]))
5+
6+
(deftest end-to-end
7+
(server))

0 commit comments

Comments
 (0)