From 156d315f2289fe705641dc076f7c7c7d01f0ae53 Mon Sep 17 00:00:00 2001 From: debbabi Date: Wed, 1 Jul 2015 10:55:10 +0200 Subject: [PATCH] Correcting RPC problem between Java Isolates --- .../org/cohorte/herald/rpc/ClientSession.java | 153 +++--- .../cohorte/herald/rpc/HeraldRpcExporter.java | 486 +++++++++--------- 2 files changed, 325 insertions(+), 314 deletions(-) diff --git a/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/ClientSession.java b/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/ClientSession.java index b59871d..baef618 100755 --- a/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/ClientSession.java +++ b/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/ClientSession.java @@ -23,6 +23,7 @@ import org.cohorte.herald.Message; import org.jabsorb.ng.client.ClientError; import org.jabsorb.ng.client.ISession; +import org.json.JSONException; import org.json.JSONObject; /** @@ -32,78 +33,82 @@ */ public class ClientSession implements ISession { - /** The Herald core service */ - private IHerald pHerald; - - /** The targeted Peer UID */ - private String pPeerUid; - - /** The request message subject */ - private String pSubject; - - /** - * Sets up members - * - * @param aHerald - * The Herald core service - */ - public ClientSession(final IHerald aHerald, final String aPeerUid, - final String aSubject) { - - pHerald = aHerald; - pPeerUid = aPeerUid; - pSubject = aSubject; - } - - /* - * (non-Javadoc) - * - * @see org.jabsorb.ng.client.ISession#close() - */ - @Override - public void close() { - - // Clean up - pHerald = null; - pPeerUid = null; - pSubject = null; - } - - /* - * (non-Javadoc) - * - * @see org.jabsorb.ng.client.ISession#sendAndReceive(org.json.JSONObject) - */ - @Override - public JSONObject sendAndReceive(final JSONObject aMessage) { - - Object result; - try { - // Send the request as a string - result = pHerald - .send(pPeerUid, - new Message(pSubject, new JSONObject(aMessage) - .toString())); - - } catch (final HeraldException ex) { - // Error sending the message - throw new ClientError("Error sending RPC request: " + ex, ex); - } - - if (result instanceof String) { - try { - return new JSONObject((String) result); - } catch(org.json.JSONException e) { - throw new ClientError("Cannot create a JSONObject from the String : " + result); - } - } - - // The reply has already been converted to a map - else if (!(result instanceof Map)) { - // Bad result - throw new ClientError("Bad result content: not a map > " + result.toString() + " -- type=" + result.getClass()); - } - - return new JSONObject((Map) result); - } + /** The Herald core service */ + private IHerald pHerald; + + /** The targeted Peer UID */ + private String pPeerUid; + + /** The request message subject */ + private String pSubject; + + /** + * Sets up members + * + * @param aHerald + * The Herald core service + */ + public ClientSession(final IHerald aHerald, final String aPeerUid, + final String aSubject) { + + pHerald = aHerald; + pPeerUid = aPeerUid; + pSubject = aSubject; + } + + /* + * (non-Javadoc) + * + * @see org.jabsorb.ng.client.ISession#close() + */ + @Override + public void close() { + + // Clean up + pHerald = null; + pPeerUid = null; + pSubject = null; + } + + /* + * (non-Javadoc) + * + * @see org.jabsorb.ng.client.ISession#sendAndReceive(org.json.JSONObject) + */ + @Override + public JSONObject sendAndReceive(final JSONObject aMessage) { + + Object result; + try { + // Send the request as a string + result = pHerald.send(pPeerUid, new Message(pSubject, + new JSONObject(aMessage.toString()).toString())); + + } catch (final HeraldException ex) { + // Error sending the message + throw new ClientError("Error sending RPC request: " + ex, ex); + } catch (JSONException e) { + // Error on JSON creation + throw new ClientError("Error sending RPC request: " + e, e); + } + + if (result instanceof String) { + try { + return new JSONObject((String) result); + } catch (org.json.JSONException e) { + throw new ClientError( + "Cannot create a JSONObject from the String : " + + result); + } + } + + // The reply has already been converted to a map + else if (!(result instanceof Map)) { + // Bad result + throw new ClientError("Bad result content: not a map > " + + result.toString() + " -- type=" + result.getClass()); + } + + return new JSONObject((Map) result); + } } diff --git a/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java b/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java index 74ca079..d751785 100755 --- a/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java +++ b/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java @@ -55,244 +55,250 @@ @Instantiate(name = "herald-rpc-exporter") public class HeraldRpcExporter implements IServiceExporter, IMessageListener { - /** Supported export configurations */ - @ServiceProperty(name = Constants.REMOTE_CONFIGS_SUPPORTED, value = "{" - + IHeraldRpcConstants.EXPORT_CONFIG + "}") - private String[] pConfigurations; - - /** The bundle context */ - private final BundleContext pContext; - - /** The Herald core directory */ - @Requires - private IDirectory pDirectory; - - /** Exported services: Name -> ExportEndpoint */ - private final Map pEndpoints = new LinkedHashMap(); - - /** Herald message filters */ - @ServiceProperty(name = IConstants.PROP_FILTERS, value = "{" - + IHeraldRpcConstants.SUBJECT_REQUEST + "," - + IHeraldRpcConstants.SUBJECT_REPLY + "}") - private String[] pFilters; - - /** The JSON-RPC bridge (Jabsorb) */ - private JSONRPCBridge pJsonRpcBridge; - - /** Local peer UID */ - private String pLocalUid; - - /** The logger */ - @Requires(optional = true) - private LogService pLogger; - - /** - * Sets up the component - * - * @param aContext - * The bundle context - */ - public HeraldRpcExporter(final BundleContext aContext) { - - pContext = aContext; - } - - /* - * (non-Javadoc) - * - * @see - * org.cohorte.remote.IServiceExporter#exportService(org.osgi.framework. - * ServiceReference, java.lang.String, java.lang.String) - */ - @Override - public ExportEndpoint exportService(final ServiceReference aReference, - final String aName, final String aFramworkUid) - throws BundleException, IllegalArgumentException { - - if (pEndpoints.containsKey(aName)) { - pLogger.log(LogService.LOG_ERROR, - "Already use Herald-RPC endpoint name: " + aName); - return null; - } - - // Get the service - final Object service = pContext.getService(aReference); - - // Prepare extra properties - final Map extraProps = new LinkedHashMap(); - extraProps.put(IHeraldRpcConstants.PROP_HERALDRPC_PEER, pLocalUid); - extraProps.put(IHeraldRpcConstants.PROP_HERALDRPC_SUBJECT, - IHeraldRpcConstants.SUBJECT_REQUEST); - - // Prepare the endpoint bean - final ExportEndpoint endpoint = new ExportEndpoint(UUID.randomUUID() - .toString(), pLocalUid, pConfigurations, aName, aReference, - extraProps); - - // Register the object in the Jabsorb bridge - pJsonRpcBridge.registerObject(aName, service); - - // Store information - pEndpoints.put(aName, endpoint); - return endpoint; - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.remote.IServiceExporter#handles(java.lang.String[]) - */ - @Override - public boolean handles(final String[] aConfigurations) { - - if (aConfigurations == null) { - // null = "match all" - return true; - } - - // Look for a match in configurations - for (final String config : aConfigurations) { - for (final String handledConfig : pConfigurations) { - if (handledConfig.equals(config)) { - // Got a match - return true; - } - } - } - - // No match - return false; - } - - /* - * (non-Javadoc) - * - * @see - * org.cohorte.herald.IMessageListener#heraldMessage(org.cohorte.herald. - * IHerald, org.cohorte.herald.MessageReceived) - */ - @Override - public void heraldMessage(final IHerald aHerald, - final MessageReceived aMessage) { - - // Check content type - final Object rawContent = aMessage.getContent(); - if (!(rawContent instanceof String)) { - // Didn't got a string - pLogger.log(LogService.LOG_ERROR, - "Herald Jabsorb-RPC message content is not a string"); - } - - // Convert the content of the message (request map) to a JSONObject - final JSONObject jsonReq; - try { - jsonReq = new JSONObject((String) rawContent); - } catch (final JSONException ex) { - pLogger.log(LogService.LOG_ERROR, - "Error parsing the Jabsorb-RPC request: " + ex, ex); - return; - } - - // Call the method, without context - final JSONRPCResult result = pJsonRpcBridge - .call(new Object[0], jsonReq); - - // Convert the result as a JSON string containing the JSON object - final String strResult = new JSONObject(result).toString(); - - // Send the result - try { - aHerald.reply(aMessage, strResult, - IHeraldRpcConstants.SUBJECT_REPLY); - - } catch (final HeraldException ex) { - pLogger.log(LogService.LOG_ERROR, "Error sending RPC result: " + ex); - } - } - - /** - * Component invalidated - */ - @Invalidate - public void invalidate() { - - // Destroy end points - final ExportEndpoint[] endpoints = pEndpoints.values().toArray( - new ExportEndpoint[0]); - for (final ExportEndpoint endpoint : endpoints) { - try { - // Release the service, unregister the endpoint - unexportService(endpoint); - - } catch (final Exception ex) { - // Just log the error - pLogger.log(LogService.LOG_WARNING, - "Error unregistering service: " + ex, ex); - } - } - - // Clean up - pLocalUid = null; - pJsonRpcBridge = null; - } - - /* - * (non-Javadoc) - * - * @see - * org.cohorte.remote.IServiceExporter#unexportService(org.cohorte.remote - * .ExportEndpoint) - */ - @Override - public void unexportService(final ExportEndpoint aEndpoint) { - - // Pop the endpoint - if (pEndpoints.remove(aEndpoint.getName()) != null) { - // Destroy the endpoint - pJsonRpcBridge.unregisterObject(aEndpoint.getName()); - - // Release the service - pContext.ungetService(aEndpoint.getReference()); - - } else { - // Unknown endpoint - pLogger.log(LogService.LOG_WARNING, "Unknown endpoint: " - + aEndpoint); - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.remote.IServiceExporter#updateExport(org.cohorte.remote. - * ExportEndpoint, java.lang.String, java.util.Map) - */ - @Override - public void updateExport(final ExportEndpoint aEndpoint, - final String aNewName, final Map aOldProperties) - throws IllegalArgumentException { - - final ExportEndpoint knownEndpoint = pEndpoints.get(aNewName); - if (knownEndpoint != null && !knownEndpoint.equals(aEndpoint)) { - // Name already taken by another endpoint: reject it - throw new IllegalArgumentException("New name of " + aEndpoint - + " is already in use: " + aNewName); - } - - // Update storage - pEndpoints.put(aNewName, pEndpoints.remove(aEndpoint.getName())); - - // Update the endpoint - aEndpoint.setName(aNewName); - } - - /** - * Component validated - */ - @Validate - public void validate() { - - pLocalUid = pDirectory.getLocalUid(); - pJsonRpcBridge = new JSONRPCBridge(); - } + /** Supported export configurations */ + @ServiceProperty(name = Constants.REMOTE_CONFIGS_SUPPORTED, value = "{" + + IHeraldRpcConstants.EXPORT_CONFIG + "}") + private String[] pConfigurations; + + /** The bundle context */ + private final BundleContext pContext; + + /** The Herald core directory */ + @Requires + private IDirectory pDirectory; + + /** Exported services: Name -> ExportEndpoint */ + private final Map pEndpoints = new LinkedHashMap(); + + /** Herald message filters */ + @ServiceProperty(name = IConstants.PROP_FILTERS, value = "{" + + IHeraldRpcConstants.SUBJECT_REQUEST + "," + + IHeraldRpcConstants.SUBJECT_REPLY + "}") + private String[] pFilters; + + /** The JSON-RPC bridge (Jabsorb) */ + private JSONRPCBridge pJsonRpcBridge; + + /** Local peer UID */ + private String pLocalUid; + + /** The logger */ + @Requires(optional = true) + private LogService pLogger; + + /** + * Sets up the component + * + * @param aContext + * The bundle context + */ + public HeraldRpcExporter(final BundleContext aContext) { + + pContext = aContext; + } + + /* + * (non-Javadoc) + * + * @see + * org.cohorte.remote.IServiceExporter#exportService(org.osgi.framework. + * ServiceReference, java.lang.String, java.lang.String) + */ + @Override + public ExportEndpoint exportService(final ServiceReference aReference, + final String aName, final String aFramworkUid) + throws BundleException, IllegalArgumentException { + + if (pEndpoints.containsKey(aName)) { + pLogger.log(LogService.LOG_ERROR, + "Already use Herald-RPC endpoint name: " + aName); + return null; + } + + // Get the service + final Object service = pContext.getService(aReference); + + // Prepare extra properties + final Map extraProps = new LinkedHashMap(); + extraProps.put(IHeraldRpcConstants.PROP_HERALDRPC_PEER, pLocalUid); + extraProps.put(IHeraldRpcConstants.PROP_HERALDRPC_SUBJECT, + IHeraldRpcConstants.SUBJECT_REQUEST); + + // Prepare the endpoint bean + final ExportEndpoint endpoint = new ExportEndpoint(UUID.randomUUID() + .toString(), pLocalUid, pConfigurations, aName, aReference, + extraProps); + + // Register the object in the Jabsorb bridge + pJsonRpcBridge.registerObject(aName, service); + + // Store information + pEndpoints.put(aName, endpoint); + return endpoint; + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.remote.IServiceExporter#handles(java.lang.String[]) + */ + @Override + public boolean handles(final String[] aConfigurations) { + + if (aConfigurations == null) { + // null = "match all" + return true; + } + + // Look for a match in configurations + for (final String config : aConfigurations) { + for (final String handledConfig : pConfigurations) { + if (handledConfig.equals(config)) { + // Got a match + return true; + } + } + } + + // No match + return false; + } + + /* + * (non-Javadoc) + * + * @see + * org.cohorte.herald.IMessageListener#heraldMessage(org.cohorte.herald. + * IHerald, org.cohorte.herald.MessageReceived) + */ + @Override + public void heraldMessage(final IHerald aHerald, + final MessageReceived aMessage) { + + // Check content type + final Object rawContent = aMessage.getContent(); + if (!(rawContent instanceof String)) { + // Didn't got a string + pLogger.log(LogService.LOG_ERROR, + "Herald Jabsorb-RPC message content is not a string"); + } + + // Convert the content of the message (request map) to a JSONObject + final JSONObject jsonReq; + try { + jsonReq = new JSONObject((String) rawContent); + } catch (final JSONException ex) { + pLogger.log(LogService.LOG_ERROR, + "Error parsing the Jabsorb-RPC request: " + ex, ex); + return; + } + + // Call the method, without context + final JSONRPCResult result = pJsonRpcBridge + .call(new Object[0], jsonReq); + + // Convert the result as a JSON string containing the JSON object + String strResult = null; + try { + strResult = new JSONObject(result.toString()).toString(); + } catch (JSONException e) { + pLogger.log(LogService.LOG_ERROR, + "Error constructing JSON object of RPC result: " + e); + } + + // Send the result + try { + aHerald.reply(aMessage, strResult, + IHeraldRpcConstants.SUBJECT_REPLY); + + } catch (final HeraldException ex) { + pLogger.log(LogService.LOG_ERROR, "Error sending RPC result: " + ex); + } + } + + /** + * Component invalidated + */ + @Invalidate + public void invalidate() { + + // Destroy end points + final ExportEndpoint[] endpoints = pEndpoints.values().toArray( + new ExportEndpoint[0]); + for (final ExportEndpoint endpoint : endpoints) { + try { + // Release the service, unregister the endpoint + unexportService(endpoint); + + } catch (final Exception ex) { + // Just log the error + pLogger.log(LogService.LOG_WARNING, + "Error unregistering service: " + ex, ex); + } + } + + // Clean up + pLocalUid = null; + pJsonRpcBridge = null; + } + + /* + * (non-Javadoc) + * + * @see + * org.cohorte.remote.IServiceExporter#unexportService(org.cohorte.remote + * .ExportEndpoint) + */ + @Override + public void unexportService(final ExportEndpoint aEndpoint) { + + // Pop the endpoint + if (pEndpoints.remove(aEndpoint.getName()) != null) { + // Destroy the endpoint + pJsonRpcBridge.unregisterObject(aEndpoint.getName()); + + // Release the service + pContext.ungetService(aEndpoint.getReference()); + + } else { + // Unknown endpoint + pLogger.log(LogService.LOG_WARNING, "Unknown endpoint: " + + aEndpoint); + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.remote.IServiceExporter#updateExport(org.cohorte.remote. + * ExportEndpoint, java.lang.String, java.util.Map) + */ + @Override + public void updateExport(final ExportEndpoint aEndpoint, + final String aNewName, final Map aOldProperties) + throws IllegalArgumentException { + + final ExportEndpoint knownEndpoint = pEndpoints.get(aNewName); + if (knownEndpoint != null && !knownEndpoint.equals(aEndpoint)) { + // Name already taken by another endpoint: reject it + throw new IllegalArgumentException("New name of " + aEndpoint + + " is already in use: " + aNewName); + } + + // Update storage + pEndpoints.put(aNewName, pEndpoints.remove(aEndpoint.getName())); + + // Update the endpoint + aEndpoint.setName(aNewName); + } + + /** + * Component validated + */ + @Validate + public void validate() { + + pLocalUid = pDirectory.getLocalUid(); + pJsonRpcBridge = new JSONRPCBridge(); + } }