diff --git a/src/main/java/org/arl/fjage/connectors/TcpConnector.java b/src/main/java/org/arl/fjage/connectors/TcpConnector.java index 18294221e..0151d5d58 100644 --- a/src/main/java/org/arl/fjage/connectors/TcpConnector.java +++ b/src/main/java/org/arl/fjage/connectors/TcpConnector.java @@ -93,11 +93,8 @@ public boolean waitOutputCompletion(long timeout) { @Override public String[] connections() { - if (sock == null || sock.isClosed()){ - return new String[0]; - }else { - return new String[] { sock.getInetAddress().getHostAddress()+":"+sock.getPort() }; - } + if (sock == null || sock.isClosed()) return new String[0]; + return new String[] { sock.getInetAddress().getHostAddress()+":"+sock.getPort() }; } @Override diff --git a/src/main/java/org/arl/fjage/remote/Tunnel.java b/src/main/java/org/arl/fjage/remote/Tunnel.java new file mode 100644 index 000000000..f891b65da --- /dev/null +++ b/src/main/java/org/arl/fjage/remote/Tunnel.java @@ -0,0 +1,314 @@ +package org.arl.fjage.remote; + +import java.util.*; +import java.util.concurrent.*; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import org.arl.fjage.*; +import org.arl.fjage.param.*; +import org.arl.fjage.remote.JsonMessage; +import org.arl.fjage.connectors.*; + +/** + * An agent that serves as a tunnel for messages between two fjage platforms. + *

+ * A tunnel can be configured as a server or a client. A server tunnel listens on + * a specified TCP port for incoming connections from client tunnels. A client + * tunnel connects to a server tunnel at a specified IP address and TCP port. + *

+ * Once a connection is established, the tunnel agent forwards selected messages + * between the two platforms. The `agents` parameter specifies the list of remote + * agents or topics forwarded through the tunnel. + */ +public class Tunnel extends Agent implements ConnectionListener, MessageListener { + + //// private attributes + + protected final static long MONITOR_PERIOD = 5000; + + protected String ip; + protected int port; + + protected TcpServer server = null; + protected List agents = new ArrayList<>(); + protected List connectors = new ArrayList<>(); + protected ExecutorService writeExecutor = null; + protected ExecutorService readExecutor = null; + protected int connID = 0; + protected Map connIDs = new HashMap<>(); + + //// constructors + + public Tunnel(int port) { + this.ip = null; + this.port = port; + } + + public Tunnel(String ip, int port) { + this.ip = ip; + this.port = port; + } + + //// documentation + + public final static String __doc__ = + "# @@ - tunnel\n\n" + + "Tunnels messages between two fjage platforms over a TCP/IP link.\n\n" + + "## Parameters:\n\n" + + "### @@.agents - list of remote agents/topics visible through the tunnel\n\n" + + "Example:\n @@.agents = [agent('remoteAgent'), topic('remoteTopic')]\n\n" + + "### @@.ip - IP address of the server to connect to (null for servers)\n" + + "### @@.port - TCP port number\n"; + + //// agent methods + + @Override + public void init() { + register(org.arl.fjage.shell.Services.DOCUMENTATION); + readExecutor = Executors.newCachedThreadPool(); + writeExecutor = Executors.newSingleThreadExecutor(); + add(new ParameterMessageBehavior(TunnelParam.class)); + if (ip == null) { + server = new TcpServer(port, this); + } else { + connect(); + add(new TickerBehavior(MONITOR_PERIOD) { + @Override + public void onTick() { + synchronized (connectors) { + if (connectors.isEmpty()) connect(); + } + } + }); + } + getContainer().addListener(this); + log.info("Agent "+getName()+" init"); + } + + @Override + public void shutdown() { + log.info("Agent "+getName()+" shutdown"); + getContainer().removeListener(this); + readExecutor.shutdownNow(); + writeExecutor.shutdownNow(); + if (server != null) { + server.close(); + server = null; + } + synchronized (connectors) { + for (Connector c : connectors) c.close(); + connectors.clear(); + connIDs.clear(); + } + } + + //// connection & message management + + @Override + public void connected(Connector connector) { + log.info("Incoming connection: "+connector.getName()); + synchronized (connectors) { + connectors.add(connector); + connIDs.put(++connID, connector); + monitor(connID, connector); + } + } + + protected void connect() { + try { + Connector c = new TcpConnector(ip, port); + log.info("Connected to "+ip+":"+port); + synchronized (connectors) { + connectors.add(c); + connIDs.put(++connID, c); + monitor(connID, c); + } + } catch (IOException ex) { + log.info("Failed to connect to "+ip+":"+port+": "+ex.getMessage()); + } + } + + @Override + public boolean onReceive(Message msg) { + synchronized (connectors) { + if (connectors.isEmpty()) return false; + } + AgentID rcpt = msg.getRecipient(); + AgentID sender = msg.getSender(); + if (rcpt == null || sender == null) return false; + if (sender.getName().contains("@")) return false; + boolean shouldForward = false; + synchronized (agents) { + shouldForward = agents.contains(rcpt); + } + if (shouldForward) { + JsonMessage jmsg = new JsonMessage(); + jmsg.message = msg; + String json = jmsg.toJson(); + log.info("* << "+json); + synchronized (connectors) { + for (Connector c: connectors) + sendToRemote(c, json.getBytes(StandardCharsets.UTF_8)); + } + if (!rcpt.isTopic()) return true; + } else if (!rcpt.isTopic() && rcpt.getName().contains("@")) { + int id = 0; + Connector c = null; + String rname = null; + try { + String s = rcpt.getName(); + int i = s.lastIndexOf('@'); + id = Integer.parseInt(s.substring(i+1)); + rname = s.substring(0, i); + synchronized (connectors) { + c = connIDs.get(id); + } + } catch (NumberFormatException ex) { + return false; + } + if (c != null) { + msg.setRecipient(new AgentID(rname)); + JsonMessage jmsg = new JsonMessage(); + jmsg.message = msg; + String json = jmsg.toJson(); + log.info(id+" << "+json); + sendToRemote(c, json.getBytes(StandardCharsets.UTF_8)); + return true; + } + } + return false; + } + + protected void monitor(int id, Connector c) { + readExecutor.submit(new Runnable() { + @Override + public void run() { + String cname = c.getName(); + try (BufferedReader in = new BufferedReader(new InputStreamReader(c.getInputStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = in.readLine()) != null) { + log.info(id+" >> "+line); + JsonMessage jmsg = JsonMessage.fromJson(line); + if (jmsg == null || jmsg.message == null) continue; + AgentID sender = jmsg.message.getSender(); + if (sender == null) continue; + jmsg.message.setSender(new AgentID(sender.getName() + "@" + id)); + getContainer().send(jmsg.message); + } + } catch (IOException ex) { + log.info("Read from "+cname+" failed: "+ex.getMessage()); + } catch (Exception ex) { + log.log(Level.WARNING, "Exception on "+cname+": "+ex.getMessage(), ex); + } + removeConnector(c); + } + }); + } + + protected void sendToRemote(Connector c, byte[] data) { + writeExecutor.submit(new Runnable() { + @Override + public void run() { + try { + OutputStream out = c.getOutputStream(); + if (out == null) removeConnector(c); + else synchronized (out) { + out.write(data); + out.write('\n'); + out.flush(); + } + } catch (IOException ex) { + log.warning("Write to "+c.getName()+" failed: "+ex.getMessage()); + removeConnector(c); + } + } + }); + } + + protected void removeConnector(Connector c) { + synchronized (connectors) { + connectors.remove(c); + connIDs.values().removeIf(v -> v.equals(c)); + } + c.close(); + } + + //// parameters + + /** + * Get IP address for the tunnel. In case of a client tunnel, this is the IP + * address of the server to connect to. In case of a server tunnel, this is + * set to null. + * + * @return IP address for the tunnel, or null if this is a server tunnel. + */ + public String getIp() { + return ip; + } + + /** + * Get TCP port number for the tunnel. In case of a client tunnel, this is the + * TCP port number of the server to connect to. In case of a server tunnel, + * this is the TCP port number to listen on for incoming client connections. + * + * @return TCP port number for the tunnel. + */ + public int getPort() { + if (port == 0 && server != null) return server.getPort(); + return port; + } + + /** + * Get the list of remote agents or topics visible through the tunnel. + * + * @return List of remote agents/topics visible through the tunnel. + */ + public List getAgents() { + synchronized (agents) { + List copy = new ArrayList<>(); + for (AgentID aid: agents) copy.add(aid); + return copy; + } + } + + /** + * Set the list of remote agents or topics visible through the tunnel. + * + * @param agents List of remote agents/topics visible through the tunnel. + */ + public void setAgents(List agents) { + synchronized (this.agents) { + this.agents.clear(); + if (agents == null) return; + for (AgentID aid: agents) + if (aid != null) this.agents.add(new AgentID(aid.getName(), aid.isTopic())); + } + } + + /** + * Get title of the tunnel agent. + * + * @return title. + */ + public String getTitle() { + return "Tunnel"; + } + + /** + * Get description of the tunnel agent. + * + * @return description. + */ + public String getDescription() { + String s = " (no connections)"; + synchronized (connectors) { + int n = connectors.size(); + if (n == 1) s = " (1 connection)"; + else if (n > 1) s = " ("+n+" connections)"; + } + if (ip != null) return "Tunnel to " + ip + ":" + getPort() + s; + return "Tunnel listening on port " + getPort() + s; + } + +} diff --git a/src/main/java/org/arl/fjage/remote/TunnelParam.java b/src/main/java/org/arl/fjage/remote/TunnelParam.java new file mode 100644 index 000000000..f7a6a3acc --- /dev/null +++ b/src/main/java/org/arl/fjage/remote/TunnelParam.java @@ -0,0 +1,26 @@ +package org.arl.fjage.remote; + +import org.arl.fjage.param.Parameter; + +public enum TunnelParam implements Parameter { + + /** + * IP address for the tunnel. In case of a client tunnel, this is the IP + * address of the server to connect to. In case of a server tunnel, this is + * set to null. + */ + ip, + + /** + * TCP port number for the tunnel. In case of a client tunnel, this is the + * TCP port number of the server to connect to. In case of a server tunnel, + * this is the TCP port number to listen on for incoming client connections. + */ + port, + + /** + * List of remote agents/topics visible through the tunnel. + */ + agents + +} diff --git a/src/test/java/org/arl/fjage/test/BasicTests.java b/src/test/java/org/arl/fjage/test/BasicTests.java index 1ad760d50..8a0fa4ee5 100644 --- a/src/test/java/org/arl/fjage/test/BasicTests.java +++ b/src/test/java/org/arl/fjage/test/BasicTests.java @@ -12,9 +12,7 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; +import java.util.*; import java.util.logging.Logger; import org.arl.fjage.*; import org.arl.fjage.param.*; @@ -22,6 +20,7 @@ import org.arl.fjage.remote.Gateway; import org.arl.fjage.remote.MasterContainer; import org.arl.fjage.remote.SlaveContainer; +import org.arl.fjage.remote.Tunnel; import org.arl.fjage.shell.*; import org.junit.Before; import org.junit.Test; @@ -403,6 +402,56 @@ public void testListener2() { assertTrue(server.nuisance > 0); } + @Test + public void testTunnel() { + log.info("testTunnel"); + Platform platform = new RealTimePlatform(); + Container c1 = new Container(platform); + Container c2 = new Container(platform); + MyMessageListener l1 = new MyMessageListener(); + c1.addListener(l1); + MyMessageListener l2 = new MyMessageListener(); + c2.addListener(l2); + platform.start(); + Tunnel t1 = new Tunnel(0); + c1.add("t1", t1); + platform.delay(250); + Tunnel t2 = new Tunnel("localhost", t1.getPort()); + c2.add("t2", t2); + List agents = new ArrayList<>(); + agents.add(new AgentID("t2")); + agents.add(new AgentID("sharedTopic", true)); + t1.setAgents(agents); + agents.clear(); + agents.add(new AgentID("t1")); + agents.add(new AgentID("sharedTopic", true)); + t2.setAgents(agents); + platform.delay(250); + Message msg = new ParameterReq(); + msg.setRecipient(new AgentID("t2")); + msg.setSender(new AgentID("test-c1")); + c1.send(msg); + platform.delay(250); + assertTrue(l1.msgs.get(l1.msgs.size()-1) instanceof ParameterRsp); + l1.msgs.clear(); + l2.msgs.clear(); + msg = new ParameterReq(); + msg.setRecipient(new AgentID("t1")); + msg.setSender(new AgentID("test-c2")); + c2.send(msg); + platform.delay(250); + assertTrue(l2.msgs.get(l2.msgs.size()-1) instanceof ParameterRsp); + l1.msgs.clear(); + l2.msgs.clear(); + msg = new Message(); + msg.setRecipient(new AgentID("sharedTopic", true)); + msg.setSender(new AgentID("test-c1")); + c1.send(msg); + platform.delay(250); + assertTrue(l2.msgs.get(l2.msgs.size()-1) instanceof Message); + platform.shutdown(); + } + @Test public void testParam1() { log.info("testParam1"); @@ -908,10 +957,12 @@ public void onTick() { } private class MyMessageListener implements MessageListener { + public List msgs = Collections.synchronizedList(new ArrayList()); public int n = 0; public boolean eat = false; @Override public boolean onReceive(Message msg) { + msgs.add(msg); n++; return eat; }