88import java .util .HashMap ;
99import java .util .List ;
1010import java .util .Optional ;
11+ import java .util .concurrent .CountDownLatch ;
12+ import java .util .concurrent .atomic .*;
1113import java .util .function .Consumer ;
1214
15+ import java .util .logging .*;
1316import io .github .rdfc .json .ChannelHandlerModule ;
1417
1518import com .fasterxml .jackson .databind .DeserializationFeature ;
@@ -44,26 +47,49 @@ public class Runner implements StreamObserver<RunnerMessage> {
4447
4548 protected final ObjectMapper mapper ;
4649
47- public Runner (RunnerGrpc .RunnerStub stub , String uri ) {
50+ private final AtomicInteger awaiting = new AtomicInteger (0 );
51+ private final Runnable onComplete ;
52+ private final Logger logger ;
53+
54+ protected final String uri ;
55+
56+ public Runner (RunnerGrpc .RunnerStub stub , String uri , Runnable onComplete ) {
57+ this .uri = uri ;
4858 this .stream = stub .connect (this );
59+ this .logger = GrpcLogHandler .createLogger (stub , uri , "cli" );
60+
61+ this .onComplete = onComplete ;
4962 this .stub = stub ;
5063 this .mapper = new ObjectMapper ();
5164 this .mapper .registerModule (new ChannelHandlerModule (this ));
5265 mapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
5366
67+ this .logger .info ("Hello from the runner!" );
5468 this .sendIdentify (uri );
5569 }
5670
5771 public void setReader (String uri , Reader reader ) {
5872 this .channels .put (uri , reader );
5973 }
6074
75+ private void decreaseAndCheckEnd () {
76+ var v = this .awaiting .decrementAndGet ();
77+ if (v == 0 ) {
78+ this .stream .onCompleted ();
79+ this .onComplete .run ();
80+ }
81+ }
82+
6183 @ Override
6284 public void onNext (RunnerMessage value ) {
85+
86+ System .out .println ("Got message " + value .getAllFields ().keySet ().toString ());
87+
6388 if (value .hasPipeline ()) {
6489 System .out .println ("Pipeline message" );
6590 return ;
6691 }
92+
6793 if (value .hasMsg ()) {
6894 System .out .println ("Msg message" );
6995 var msg = value .getMsg ();
@@ -95,6 +121,7 @@ public void onNext(RunnerMessage value) {
95121 if (value .hasClose ()) {
96122 System .out .println ("Close message" );
97123 var msg = value .getClose ();
124+
98125 var channel = this .channels .get (msg .getChannel ());
99126 if (channel != null ) {
100127 channel .close ();
@@ -107,46 +134,57 @@ public void onNext(RunnerMessage value) {
107134
108135 if (value .hasStart ()) {
109136 System .out .println ("Start message" );
137+
110138 this .processors .forEach ((k , v ) -> {
111139 v .produce (st -> {
112140 System .out .println ("Processor Produced " + k );
113- // TODO: do something
141+ this . decreaseAndCheckEnd ();
114142 });
115143 });
144+
145+ System .out .println ("Start happened" );
116146 return ;
117147 }
118148
119149 if (value .hasProc ()) {
120150 System .out .println ("Proc start message" );
121151 var proc = value .getProc ();
122152 var uri = proc .getUri ();
153+
154+ var procLogger = GrpcLogHandler .createLogger (stub , uri , this .uri );
123155 try {
124- Processor <?> processor = this .startProc (proc );
156+ var latch = new CountDownLatch (1 );
157+ Processor <?> processor = this .startProc (proc , procLogger );
125158 processor .init (_void -> {
159+ this .awaiting .updateAndGet (x -> x + 2 );
126160 processor .transform (st -> {
127161 System .out .println ("Processor transformed " + uri );
128- // TODO: do something
162+ this . decreaseAndCheckEnd ();
129163 });
130- this . sendProcInit ( uri , Optional . empty () );
164+ latch . countDown ( );
131165 });
166+ latch .await ();
167+
168+ System .out .println ("Sending proc init " + uri );
169+ this .sendProcInit (uri , Optional .empty ());
132170 } catch (Exception e ) {
133171 e .printStackTrace ();
134172 this .sendProcInit (uri , Optional .of (e .toString ()));
135173 }
174+
136175 return ;
137176 }
138-
139177 System .err .println ("Unsupported message " + value .getUnknownFields ());
140178 }
141179
142- private Processor <?> startProc (rdfc .Runner .Processor proc ) throws Exception {
180+ private Processor <?> startProc (rdfc .Runner .Processor proc , Logger logger ) throws Exception {
143181 var uri = proc .getUri ();
144182 var config = proc .getConfig ();
145183 var params = proc .getArguments ();
146184 System .out .println ("Trying to start proc " + uri + " " + config + " " + params );
147185
148186 var arg = mapper .readValue (config , Config .class );
149- var processor = arg .loadClass (this , params );
187+ var processor = arg .loadClass (this , params , logger );
150188
151189 this .processors .put (uri , processor );
152190
@@ -250,38 +288,39 @@ private static class Config {
250288 public String jar ;
251289 public String clazz ;
252290
253- Processor <?> loadClass (Runner runner , String arguments ) throws Exception {
291+ Processor <?> loadClass (Runner runner , String arguments , Logger logger ) throws Exception {
254292 URL jarUrl = new URI (this .jar ).toURL ();
255- try (URLClassLoader loader = new URLClassLoader (new URL [] { jarUrl }, Rdfc .class .getClassLoader ())) {
256- Class <?> clazz = loader .loadClass (this .clazz );
257-
258- var mapper = new ObjectMapper ();
259- mapper .registerModule (new ChannelHandlerModule (runner ));
260- mapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
261- mapper .setTypeFactory (TypeFactory .defaultInstance ().withClassLoader (loader ));
262- // Find constructor with one argument
263- Constructor <?> constructor = null ;
264- for (Constructor <?> c : clazz .getConstructors ()) {
265- if (c .getParameterCount () == 1 ) {
266- constructor = c ;
267- break ;
268- }
269- }
270- if (constructor == null ) {
271- throw new RuntimeException ("No single-arg constructor found" );
293+ URLClassLoader loader = new URLClassLoader (new URL [] { jarUrl }, Rdfc .class .getClassLoader ());
294+ Class <?> clazz = loader .loadClass (this .clazz );
295+
296+ var mapper = new ObjectMapper ();
297+ mapper .registerModule (new ChannelHandlerModule (runner ));
298+ mapper .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false );
299+ mapper .setTypeFactory (TypeFactory .defaultInstance ().withClassLoader (loader ));
300+
301+ // Find constructor with one argument
302+ Constructor <?> constructor = null ;
303+ for (Constructor <?> c : clazz .getConstructors ()) {
304+ if (c .getParameterCount () == 2 ) {
305+ constructor = c ;
306+ break ;
272307 }
308+ }
273309
274- Class <?> paramType = constructor .getParameterTypes ()[0 ];
310+ if (constructor == null ) {
311+ throw new RuntimeException ("No single-arg constructor found" );
312+ }
275313
276- System . out . println ( "Found type " + paramType . getTypeName ()) ;
314+ Class <?> paramType = constructor . getParameterTypes ()[ 0 ] ;
277315
278- // Use Jackson to deserialize JSON into the param type
279- Object arg = mapper .readValue (arguments , paramType );
316+ System .out .println ("Found type " + paramType .getTypeName ());
280317
281- // Instantiate using default constructor
282- constructor .setAccessible (true );
283- return (Processor <?>) constructor .newInstance (arg );
284- }
318+ // Use Jackson to deserialize JSON into the param type
319+ Object arg = mapper .readValue (arguments , paramType );
320+
321+ // Instantiate using default constructor
322+ constructor .setAccessible (true );
323+ return (Processor <?>) constructor .newInstance (arg , logger );
285324 }
286325 }
287326}
0 commit comments