3
3
*/
4
4
package com .jpaulmorrison .fbp .components ;
5
5
6
+ import java .io .BufferedReader ;
6
7
import java .io .File ;
7
8
import java .io .FileInputStream ;
9
+ import java .io .IOException ;
10
+ import java .io .InputStream ;
11
+ import java .io .InputStreamReader ;
8
12
9
13
/**
10
14
* General component to receive sequence of data chunks from a web socket and convert them
31
35
*/
32
36
33
37
import java .net .InetSocketAddress ;
38
+ import java .net .URL ;
39
+ import java .net .UnknownHostException ;
40
+ import java .nio .ByteBuffer ;
34
41
import java .security .KeyStore ;
42
+ import java .util .Collection ;
35
43
import java .util .Collections ;
36
44
import java .util .HashMap ;
37
45
import java .util .LinkedList ;
46
+ import java .util .concurrent .BlockingQueue ;
38
47
import java .util .concurrent .atomic .AtomicBoolean ;
48
+ import java .util .concurrent .atomic .AtomicInteger ;
39
49
40
50
import javax .net .ssl .KeyManagerFactory ;
41
51
import javax .net .ssl .SSLContext ;
42
52
import javax .net .ssl .TrustManagerFactory ;
43
53
44
54
import org .java_websocket .WebSocket ;
55
+ import org .java_websocket .WebSocketImpl ;
56
+ import org .java_websocket .WebSocketServerFactory ;
45
57
import org .java_websocket .drafts .Draft_6455 ;
46
58
import org .java_websocket .drafts .Draft ;
47
59
import org .java_websocket .exceptions .InvalidDataException ;
48
60
import org .java_websocket .framing .CloseFrame ;
49
-
50
61
import org .java_websocket .handshake .*;
51
62
import org .java_websocket .server .DefaultSSLWebSocketServerFactory ;
63
+ import org .java_websocket .server .DefaultWebSocketServerFactory ;
52
64
import org .java_websocket .server .WebSocketServer ;
53
65
54
66
import org .slf4j .Logger ;
58
70
59
71
@ InPorts ({ @ InPort ("PORT" ), @ InPort (value = "OPT" , optional = true ) })
60
72
@ OutPort ("OUT" )
61
- public class WebSocketReceive extends Component {
73
+ public class WebSocketReceive extends Component /* WebSocketServer */ {
62
74
63
75
private InputPort portPort ;
64
76
private InputPort optPort ;
@@ -68,10 +80,14 @@ public class WebSocketReceive extends Component {
68
80
HashMap <WebSocket , LinkedList <Packet <?>>> hm = null ;
69
81
70
82
Component comp = null ; // JavaFBP Component
83
+ MyWebSocketServer test = null ;
84
+ WebSocketServerFactory wsf = null ;
85
+ WebSocketServerFactory wsf2 = null ;
71
86
72
- @ Override
87
+ // @Override
73
88
protected void execute () throws Exception {
74
- boolean wss = false ;
89
+ boolean wssOpt = false ;
90
+
75
91
comp = this ;
76
92
77
93
killsw = new AtomicBoolean ();
@@ -87,23 +103,21 @@ protected void execute() throws Exception {
87
103
p = optPort .receive ();
88
104
if (p != null ) {
89
105
if (p .getContent ().equals ("TLS" ))
90
- wss = true ;
106
+ wssOpt = true ;
91
107
drop (p );
92
108
}
93
109
optPort .close ();
94
110
95
111
InetSocketAddress isa = new InetSocketAddress ("localhost" , port );
96
- MyWebSocketServer test = new MyWebSocketServer (isa , new Draft_6455 ());
97
- // Draft 17 - Hybi 17/RFC 6455 and is currently supported by Chrome16+ and IE10.
98
- // Draft 10 - Hybi 10. This draft is supported by Chrome15 and Firefox6-9.
99
-
100
- // test.setReuseAddress(true);
101
- // wss.stop();
112
+ test = new MyWebSocketServer (isa , new Draft_6455 ());
113
+
102
114
System .out .println ("WebSocketServer starting" );
103
115
putGlobal ("WebSocketServer" , test );
104
-
116
+
117
+
118
+
105
119
try {
106
- if (wss ) {
120
+ if (wssOpt ) {
107
121
// load up the key store
108
122
String STORETYPE = "JKS" ;
109
123
// String KEYSTORE = Paths.get("src", "main", "resources", "keystore.jks")
@@ -144,7 +158,8 @@ protected void execute() throws Exception {
144
158
// List<String> protocols = new
145
159
// ArrayList<String>(Arrays.asList(engine.getEnabledProtocols()));
146
160
// protocols.remove("SSLv3");
147
- DefaultSSLWebSocketServerFactory wsf = new DefaultSSLWebSocketServerFactory (sslContext );
161
+ wsf = new DefaultSSLWebSocketServerFactory (sslContext );
162
+ //wsf2 = new DefaultWebSocketServerFactory();
148
163
149
164
test .setWebSocketFactory (wsf );
150
165
}
@@ -178,7 +193,7 @@ protected void execute() throws Exception {
178
193
e .printStackTrace ();
179
194
}
180
195
181
- test .setConnectionLostTimeout (0 );
196
+ // test.setConnectionLostTimeout(0);
182
197
183
198
}
184
199
@@ -196,35 +211,59 @@ protected void openPorts() {
196
211
197
212
}
198
213
199
- class MyWebSocketServer extends WebSocketServer {
214
+ class MyWebSocketServer extends WebSocketServer {
200
215
201
216
// private static int counter = 0;
202
217
203
218
// Component comp = null;
204
219
205
220
final Logger log = LoggerFactory .getLogger (WebSocketReceive .class );
221
+ private BlockingQueue <ByteBuffer > buffers ;
222
+ private final AtomicInteger queuesize = new AtomicInteger (0 );
206
223
207
224
// public MyWebSocketServer(final int port, final Draft d) throws
208
225
// UnknownHostException {
209
226
// super(new InetSocketAddress(port), Collections.singletonList(d));
210
227
// }
211
228
212
- // public MyWebSocketServer(final int port) throws UnknownHostException {
229
+ //public MyWebSocketServer(final int port) throws UnknownHostException {
213
230
// super(new InetSocketAddress(port));
214
231
// }
215
232
216
233
public MyWebSocketServer (final InetSocketAddress address , final Draft d ) {
217
234
super (address , Collections .singletonList (d ));
218
235
}
219
236
220
- public MyWebSocketServer (final InetSocketAddress address ) {
221
- super (address );
222
- }
237
+ // public MyWebSocketServer(final InetSocketAddress address) {
238
+ // super(address);
239
+ // }
223
240
241
+
242
+ @ Override
243
+
244
+ public ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer (WebSocket conn , Draft draft ,
245
+ ClientHandshake request ) throws InvalidDataException {
246
+ ServerHandshakeBuilder builder = super
247
+ .onWebsocketHandshakeReceivedAsServer (conn , draft , request );
248
+ //System.out.println(request);
249
+ Collection <WebSocket > conns = getConnections ();
250
+
251
+ for (WebSocket ws : conns ) {
252
+ String s = ws .getResourceDescriptor ();
253
+ }
254
+ //test.setWebSocketFactory(wsf2);
255
+
256
+ //ServerHandshakeBuilder builder = super.onWebsocketHandshakeReceivedAsServer(conn, draft, request);
257
+ return new HandshakeImpl1Server ();
258
+ //return builder;
259
+ }
260
+
261
+
224
262
@ Override
225
263
public void onOpen (WebSocket conn , ClientHandshake handshake ) {
226
264
conn .send ("Welcome to the server!" ); // This method sends a message to the new client
227
- broadcast ("new connection: " + handshake .getResourceDescriptor ()); // This method sends a message to all
265
+ String s = handshake .getResourceDescriptor ();
266
+ broadcast ("new connection: " + s ); // This method sends a message to all
228
267
// clients connected
229
268
System .out .println ("new connection to " + conn .getRemoteSocketAddress ());
230
269
}
@@ -245,29 +284,40 @@ public void onStart() {
245
284
System .out .println ("server started successfully" );
246
285
}
247
286
248
- @ Override
249
-
250
- public ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer (WebSocket conn , Draft draft ,
251
- ClientHandshake request ) throws InvalidDataException {
252
- System .out .println (request );
253
- String val = request .getFieldValue ("Connection" );
254
- System .out .println ("Connection: " + val );
255
- ServerHandshakeBuilder builder = super .onWebsocketHandshakeReceivedAsServer (conn , draft , request );
256
- return new HandshakeImpl1Server (); // return builder;
257
- }
258
-
287
+
288
+
259
289
@ Override
260
290
public void onWebsocketHandshakeReceivedAsClient (WebSocket conn , ClientHandshake request ,
261
291
ServerHandshake response ) throws InvalidDataException {
262
292
// To overwrite
263
293
System .out .println (request + ": " + response );
264
294
}
265
-
266
-
295
+
296
+
297
+
298
+
267
299
/*
268
- * Make sure that the substream comes out of a single port of a single process,
269
- * all together...
270
- */
300
+ public void doDecode(WebSocketImpl ws, ByteBuffer buf) throws InterruptedException {
301
+ try {
302
+ ws.decode(buf);
303
+ } catch (Exception e) {
304
+ log.error("Error while reading from remote connection", e);
305
+ } finally {
306
+ pushBuffer(buf);
307
+ }
308
+ }
309
+
310
+ public void pushBuffer(ByteBuffer buf) throws InterruptedException {
311
+ if (buffers.size() > queuesize.intValue()) {
312
+ return;
313
+ }
314
+ buffers.put(buf);
315
+ }
316
+ */
317
+ //
318
+ // Make sure that the substream comes out of a single port of a single process,
319
+ // all together...
320
+ //
271
321
272
322
@ SuppressWarnings ("rawtypes" )
273
323
@ Override
@@ -322,7 +372,7 @@ public void onMessage(final WebSocket conn, final String message) {
322
372
// outPort.send(p2);
323
373
324
374
}
325
-
375
+
326
376
}
327
377
328
378
}
0 commit comments