diff --git a/java/org.cohorte.herald.api/src/main/java/org/cohorte/herald/IHerald.java b/java/org.cohorte.herald.api/src/main/java/org/cohorte/herald/IHerald.java index dc21baf..0f493d5 100644 --- a/java/org.cohorte.herald.api/src/main/java/org/cohorte/herald/IHerald.java +++ b/java/org.cohorte.herald.api/src/main/java/org/cohorte/herald/IHerald.java @@ -25,305 +25,321 @@ */ public interface IHerald { - /** - * Fires (and forget) a message to the given peer - * - * @param aPeer - * The peer to send the message to - * @param aMessage - * The message to send - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - */ - String fire(Peer aPeer, Message aMessage) throws NoTransport; + /** + * Adds a message listener + * + * @param aListener + * Message Listener + */ + void addMessageListener(IMessageListener aListener, String[] filters); - /** - * Fires (and forget) a message to the given peer - * - * @param aPeerUid - * The UID of the peer to send the message to - * @param aMessage - * The message to send - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - * @throws UnknownPeer - * Unknown peer UID - */ - String fire(String aPeerUid, Message aMessage) throws NoTransport, - UnknownPeer; + /** + * Fires (and forget) a message to the given peer + * + * @param aPeer + * The peer to send the message to + * @param aMessage + * The message to send + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + */ + String fire(Peer aPeer, Message aMessage) throws NoTransport; - /** - * Fires (and forget) the given message to the given group of peers - * - * @param aGroupName - * The name of a group of peers - * @param aMessage - * The message to send - * @return The list of peers the message has been sent to - * @throws NoTransport - * No transport found to send the message - */ - Collection fireGroup(String aGroupName, Message aMessage) - throws NoTransport; + /** + * Fires (and forget) a message to the given peer + * + * @param aPeerUid + * The UID of the peer to send the message to + * @param aMessage + * The message to send + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + * @throws UnknownPeer + * Unknown peer UID + */ + String fire(String aPeerUid, Message aMessage) throws NoTransport, + UnknownPeer; - /** - * Tells Herald to forget informations about the given message UIDs. - * - * This can be used to clean up references to a component being invalidated. - * - * @param aMessageUid - * The UID of the message to forget - * @return True if there was a reference about this message - */ - boolean forget(final String aMessageUid); + /** + * Fires (and forget) the given message to the given group of peers + * + * @param aGroupName + * The name of a group of peers + * @param aMessage + * The message to send + * @return The list of peers the message has been sent to + * @throws NoTransport + * No transport found to send the message + */ + Collection fireGroup(String aGroupName, Message aMessage) + throws NoTransport; - /** - * Posts a message. The given methods will be called back as soon as a - * result is given, or in case of error - * - * @param aPeer - * The peer to send the message to - * @param aMessage - * The message to send - * @param aCallback - * Object to call back when a reply is received - * @param aErrback - * Object to call back when an error occurs - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - */ - String post(Peer aPeer, Message aMessage, IPostCallback aCallback, - IPostErrback aErrback) throws NoTransport; + /** + * Tells Herald to forget informations about the given message UIDs. + * + * This can be used to clean up references to a component being invalidated. + * + * @param aMessageUid + * The UID of the message to forget + * @return True if there was a reference about this message + */ + boolean forget(final String aMessageUid); - /** - * Posts a message. The given methods will be called back as soon as a - * result is given, or in case of error - * - * @param aPeer - * The peer to send the message to - * @param aMessage - * The message to send - * @param aCallback - * Object to call back when a reply is received - * @param aErrback - * Object to call back when an error occurs - * @param aTimeout - * Time after which the message will be forgotten - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - */ - String post(Peer aPeer, Message aMessage, IPostCallback aCallback, - IPostErrback aErrback, Long aTimeout) throws NoTransport; + /** + * Posts a message. The given methods will be called back as soon as a + * result is given, or in case of error + * + * @param aPeer + * The peer to send the message to + * @param aMessage + * The message to send + * @param aCallback + * Object to call back when a reply is received + * @param aErrback + * Object to call back when an error occurs + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + */ + String post(Peer aPeer, Message aMessage, IPostCallback aCallback, + IPostErrback aErrback) throws NoTransport; - /** - * Posts a message. The given methods will be called back as soon as a - * result is given, or in case of error - * - * @param aPeer - * The peer to send the message to - * @param aMessage - * The message to send - * @param aCallback - * Object to call back when a reply is received - * @param aErrback - * Object to call back when an error occurs - * @param aTimeout - * Time after which the message will be forgotten - * @param aForgetOnFirst - * Forget the message after the first answer - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - */ - String post(Peer aPeer, Message aMessage, IPostCallback aCallback, - IPostErrback aErrback, Long aTimeout, boolean aForgetOnFirst) - throws NoTransport; + /** + * Posts a message. The given methods will be called back as soon as a + * result is given, or in case of error + * + * @param aPeer + * The peer to send the message to + * @param aMessage + * The message to send + * @param aCallback + * Object to call back when a reply is received + * @param aErrback + * Object to call back when an error occurs + * @param aTimeout + * Time after which the message will be forgotten + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + */ + String post(Peer aPeer, Message aMessage, IPostCallback aCallback, + IPostErrback aErrback, Long aTimeout) throws NoTransport; - /** - * Posts a message. The given methods will be called back as soon as a - * result is given, or in case of error - * - * @param aPeerUid - * The UID of the peer to send the message to - * @param aMessage - * The message to send - * @param aCallback - * Object to call back when a reply is received - * @param aErrback - * Object to call back when an error occurs - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - */ - String post(String aPeerUid, Message aMessage, IPostCallback aCallback, - IPostErrback aErrback) throws UnknownPeer, NoTransport; + /** + * Posts a message. The given methods will be called back as soon as a + * result is given, or in case of error + * + * @param aPeer + * The peer to send the message to + * @param aMessage + * The message to send + * @param aCallback + * Object to call back when a reply is received + * @param aErrback + * Object to call back when an error occurs + * @param aTimeout + * Time after which the message will be forgotten + * @param aForgetOnFirst + * Forget the message after the first answer + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + */ + String post(Peer aPeer, Message aMessage, IPostCallback aCallback, + IPostErrback aErrback, Long aTimeout, boolean aForgetOnFirst) + throws NoTransport; - /** - * Posts a message. The given methods will be called back as soon as a - * result is given, or in case of error - * - * @param aPeerUid - * The UID of the peer to send the message to - * @param aMessage - * The message to send - * @param aCallback - * Object to call back when a reply is received - * @param aErrback - * Object to call back when an error occurs - * @param aTimeout - * Time after which the message will be forgotten - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - */ - String post(String aPeerUid, Message aMessage, IPostCallback aCallback, - IPostErrback aErrback, Long aTimeout) throws UnknownPeer, - NoTransport; + /** + * Posts a message. The given methods will be called back as soon as a + * result is given, or in case of error + * + * @param aPeerUid + * The UID of the peer to send the message to + * @param aMessage + * The message to send + * @param aCallback + * Object to call back when a reply is received + * @param aErrback + * Object to call back when an error occurs + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + */ + String post(String aPeerUid, Message aMessage, IPostCallback aCallback, + IPostErrback aErrback) throws UnknownPeer, NoTransport; - /** - * Posts a message. The given methods will be called back as soon as a - * result is given, or in case of error - * - * @param aPeerUid - * The UID of the peer to send the message to - * @param aMessage - * The message to send - * @param aCallback - * Object to call back when a reply is received - * @param aErrback - * Object to call back when an error occurs - * @param aTimeout - * Time after which the message will be forgotten - * @param aForgetOnFirst - * Forget the message after the first answer - * @return The UID of the message - * @throws NoTransport - * No transport found to send the message - */ - String post(String aPeerUid, Message aMessage, IPostCallback aCallback, - IPostErrback aErrback, Long aTimeout, boolean aForgetOnFirst) - throws UnknownPeer, NoTransport; + /** + * Posts a message. The given methods will be called back as soon as a + * result is given, or in case of error + * + * @param aPeerUid + * The UID of the peer to send the message to + * @param aMessage + * The message to send + * @param aCallback + * Object to call back when a reply is received + * @param aErrback + * Object to call back when an error occurs + * @param aTimeout + * Time after which the message will be forgotten + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + */ + String post(String aPeerUid, Message aMessage, IPostCallback aCallback, + IPostErrback aErrback, Long aTimeout) throws UnknownPeer, + NoTransport; - /** - * Posts a message to a group of peers - * - * @param aGroupName - * The name of a group of peers - * @param aMessage - * A Message bean - * @param aCallback - * Method to call back when a reply is received - * @param aErrback - * Method to call back if an error occurs - * @param aTimeout - * Time after which the message will be forgotten - * @return The message UID - * @throws ValueError - * Unknown group - * @throws NoTransport - * No transport found to send the message - */ - String postGroup(String aGroupName, Message aMessage, - IPostCallback aCallback, IPostErrback aErrback, Long aTimeout) - throws ValueError, NoTransport; + /** + * Posts a message. The given methods will be called back as soon as a + * result is given, or in case of error + * + * @param aPeerUid + * The UID of the peer to send the message to + * @param aMessage + * The message to send + * @param aCallback + * Object to call back when a reply is received + * @param aErrback + * Object to call back when an error occurs + * @param aTimeout + * Time after which the message will be forgotten + * @param aForgetOnFirst + * Forget the message after the first answer + * @return The UID of the message + * @throws NoTransport + * No transport found to send the message + */ + String post(String aPeerUid, Message aMessage, IPostCallback aCallback, + IPostErrback aErrback, Long aTimeout, boolean aForgetOnFirst) + throws UnknownPeer, NoTransport; - /** - * Replies to a message. The subject will be the one of the original - * message, prefixed with "reply/" - * - * @param aMessage - * Message to reply to - * @param aContent - * Content of the response - * @throws HeraldException - * Error sending the reply - */ - void reply(MessageReceived aMessage, Object aContent) - throws HeraldException; + /** + * Posts a message to a group of peers + * + * @param aGroupName + * The name of a group of peers + * @param aMessage + * A Message bean + * @param aCallback + * Method to call back when a reply is received + * @param aErrback + * Method to call back if an error occurs + * @param aTimeout + * Time after which the message will be forgotten + * @return The message UID + * @throws ValueError + * Unknown group + * @throws NoTransport + * No transport found to send the message + */ + String postGroup(String aGroupName, Message aMessage, + IPostCallback aCallback, IPostErrback aErrback, Long aTimeout) + throws ValueError, NoTransport; - /** - * Replies to a message. If no subject is given, it will be the one of the - * original message, prefixed with "reply/" - * - * @param aMessage - * Message to reply to - * @param aContent - * Content of the response - * @param aSubject - * Subject of the response message - * @throws HeraldException - * Error sending the reply - */ - void reply(MessageReceived aMessage, Object aContent, String aSubject) - throws HeraldException; + /** + * Removes a message listener + * + * @param aListener + * Message Listener + */ + void removeMessageListener(IMessageListener aListener); - /** - * Sends a message, and waits for its reply - * - * @param aPeer - * The peer to send the message to - * @param aMessage - * The message to send - * @return The UID of the message - * @throws HeraldException - * Error sending the message - */ - Object send(Peer aPeer, Message aMessage) throws HeraldException; + /** + * Replies to a message. The subject will be the one of the original + * message, prefixed with "reply/" + * + * @param aMessage + * Message to reply to + * @param aContent + * Content of the response + * @throws HeraldException + * Error sending the reply + */ + void reply(MessageReceived aMessage, Object aContent) + throws HeraldException; - /** - * Sends a message, and waits for its reply - * - * @param aPeer - * The peer to send the message to - * @param aMessage - * The message to send - * @param aTimeout - * Maximum time to wait for an answer - * @return The content of the reply - * @throws HeraldException - * Error sending the message - * @throws HeraldTimeout - * Timeout raised before getting an answer - */ - Object send(Peer aPeer, Message aMessage, Long aTimeout) - throws HeraldException; + /** + * Replies to a message. If no subject is given, it will be the one of the + * original message, prefixed with "reply/" + * + * @param aMessage + * Message to reply to + * @param aContent + * Content of the response + * @param aSubject + * Subject of the response message + * @throws HeraldException + * Error sending the reply + */ + void reply(MessageReceived aMessage, Object aContent, String aSubject) + throws HeraldException; - /** - * Sends a message, and waits for its reply - * - * @param aPeerUid - * The UID of the peer to send the message to - * @param aMessage - * The message to send - * @return The content of the reply - * @throws HeraldException - * Error sending the message - * @throws UnknownPeer - * Unknown peer UID - */ - Object send(String aPeerUid, Message aMessage) throws HeraldException; + /** + * Sends a message, and waits for its reply + * + * @param aPeer + * The peer to send the message to + * @param aMessage + * The message to send + * @return The UID of the message + * @throws HeraldException + * Error sending the message + */ + Object send(Peer aPeer, Message aMessage) throws HeraldException; - /** - * Sends a message, and waits for its reply - * - * @param aPeerUid - * The UID of the peer to send the message to - * @param aMessage - * The message to send - * @param aTimeout - * Maximum time to wait for an answer - * @return The content of the reply - * @throws HeraldException - * Error sending the message - * @throws HeraldTimeout - * Timeout raised before getting an answer - * @throws UnknownPeer - * Unknown peer UID - */ - Object send(String aPeerUid, Message aMessage, Long aTimeout) - throws HeraldException; + /** + * Sends a message, and waits for its reply + * + * @param aPeer + * The peer to send the message to + * @param aMessage + * The message to send + * @param aTimeout + * Maximum time to wait for an answer + * @return The content of the reply + * @throws HeraldException + * Error sending the message + * @throws HeraldTimeout + * Timeout raised before getting an answer + */ + Object send(Peer aPeer, Message aMessage, Long aTimeout) + throws HeraldException; + + /** + * Sends a message, and waits for its reply + * + * @param aPeerUid + * The UID of the peer to send the message to + * @param aMessage + * The message to send + * @return The content of the reply + * @throws HeraldException + * Error sending the message + * @throws UnknownPeer + * Unknown peer UID + */ + Object send(String aPeerUid, Message aMessage) throws HeraldException; + + /** + * Sends a message, and waits for its reply + * + * @param aPeerUid + * The UID of the peer to send the message to + * @param aMessage + * The message to send + * @param aTimeout + * Maximum time to wait for an answer + * @return The content of the reply + * @throws HeraldException + * Error sending the message + * @throws HeraldTimeout + * Timeout raised before getting an answer + * @throws UnknownPeer + * Unknown peer UID + */ + Object send(String aPeerUid, Message aMessage, Long aTimeout) + throws HeraldException; } diff --git a/java/org.cohorte.herald.core/src/main/java/org/cohorte/herald/core/Herald.java b/java/org.cohorte.herald.core/src/main/java/org/cohorte/herald/core/Herald.java index 373610c..043ec6b 100644 --- a/java/org.cohorte.herald.core/src/main/java/org/cohorte/herald/core/Herald.java +++ b/java/org.cohorte.herald.core/src/main/java/org/cohorte/herald/core/Herald.java @@ -75,1157 +75,1169 @@ @Instantiate(name = "herald-core") public class Herald implements IHerald, IHeraldInternal { - /** iPOJO requirement ID */ - private static final String ID_LISTENERS = "listeners"; - - /** iPOJO requirement ID */ - private static final String ID_TRANSPORTS = "transports"; - - /** The bundle context */ - private final BundleContext pContext; - - /** The Herald core directory */ - @Requires - private IDirectory pDirectory; - - /** The garbage collection timer */ - private LoopTimer pGarbageTimer; - - /** Object used to synchronize garbage collection */ - private final Object pGarbageToken = new Object(); - - /** Time stamp of the last garbage collection */ - private long pLastGarbage = -1; - - /** Filter -> Listeners */ - private final Map> pListeners = new LinkedHashMap<>(); - - /** Listener -> Filters */ - private final Map> pListenersFilters = new LinkedHashMap>(); - - /** The logger */ - @Requires(optional = true) - private LogService pLogger; - - /** The thread pool */ - private ExecutorService pPool; - - /** Herald "public" service registration */ - private ServiceRegistration pSvcRegistration; - - /** Access ID -> Transport implementation */ - private final Map pTransports = new LinkedHashMap<>(); - - /** List of received messages UIDs, kept 5 minutes: UID -> TTL */ - private final Map pTreatedMessages = new LinkedHashMap<>(); - - /** Events used to block "send()" methods: UID -> EventData */ - private final Map> pWaitingEvents = new LinkedHashMap<>(); - - /** Events used for "post()" methods: UID -> WaitingPost */ - private final Map pWaitingPosts = new LinkedHashMap<>(); - - /** - * Sets up members - * - * @param aContext - * The bundle context - */ - public Herald(final BundleContext aContext) { - - pContext = aContext; - } - - /** - * A message listener has been bound - * - * @param aListener - * A message listener - * @param aReference - * The injected service reference - */ - @Bind(id = ID_LISTENERS, aggregate = true, optional = true) - protected void bindListener(final IMessageListener aListener, - final ServiceReference aReference) { - - final Object rawFilters = aReference - .getProperty(IConstants.PROP_FILTERS); - String[] filters; - if (rawFilters instanceof String) { - // Single filter - filters = new String[] { (String) rawFilters }; - - } else if (rawFilters instanceof String[]) { - // Copy the array - final String[] givenFilters = (String[]) rawFilters; - filters = Arrays.copyOf(givenFilters, givenFilters.length); - - } else { - // Unreadable filters - return; - } - - synchronized (pListeners) { - for (final String filter : filters) { - // Compile the filter - final FnMatch match = new FnMatch(filter); - - // Associate the listener to the filter - Utilities.setDefault(pListeners, match, - new LinkedHashSet()).add(aListener); - Utilities.setDefault(pListenersFilters, aListener, - new LinkedHashSet()).add(match); - } - } - } - - /** - * A transport implementation has been bound - * - * @param aTransport - * A transport implementation - * @param aReference - * The injected service reference - */ - @Bind(id = ID_TRANSPORTS, aggregate = true, optional = true) - protected void bindTransport(final ITransport aTransport, - final ServiceReference aReference) { - - final String accessId = (String) aReference - .getProperty(IConstants.PROP_ACCESS_ID); - if (accessId == null || accessId.isEmpty()) { - // Ignore invalid access IDs - return; - } - - synchronized (pTransports) { - // Store the service - pTransports.put(accessId, aTransport); - - if (pSvcRegistration == null) { - // We have at least one service: provide our service - pSvcRegistration = pContext.registerService(IHerald.class, - this, null); - } - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#fire(org.cohorte.herald.Peer, - * org.cohorte.herald.Message) - */ - @Override - public String fire(final Peer aPeer, final Message aMessage) - throws NoTransport { - - // Check if we can send the message - if (pTransports.isEmpty()) { - throw new NoTransport(new Target(aPeer), "No transport bound yet."); - } - - // Try each access - boolean success = false; - for (final String access : aPeer.getAccesses()) { - final ITransport transport = pTransports.get(access); - if (transport == null) { - // No transport for this kind of access - continue; - } - - try { - // Try to use it - transport.fire(aPeer, aMessage); - - // Success: stop here - success = true; - break; - - } catch (final HeraldException ex) { - // Exception during transport - pLogger.log(LogService.LOG_WARNING, "Error using transport " - + access + ": " + ex); - } - } - - if (!success) { - // No transport succeeded - throw new NoTransport(new Target(aPeer), - "No working transport found for peer " + aPeer); - } - - return aMessage.getUid(); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#fire(java.lang.String, - * org.cohorte.herald.Message) - */ - @Override - public String fire(final String aPeerUid, final Message aMessage) - throws NoTransport, UnknownPeer { - - return fire(pDirectory.getPeer(aPeerUid), aMessage); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#fireGroup(java.lang.String, - * org.cohorte.herald.Message) - */ - @Override - public Collection fireGroup(final String aGroupName, - final Message aMessage) throws NoTransport { - - // Get all peers known in the group - final Collection allPeers = pDirectory - .getPeersForGroup(aGroupName); - if (allPeers.isEmpty()) { - pLogger.log(LogService.LOG_WARNING, "No peer in group: " - + aGroupName); - return new LinkedHashSet<>(); - } - - if (pTransports.isEmpty()) { - // Make the list of UIDs - throw new NoTransport(new Target(aGroupName, - Target.toUids(allPeers)), "No transport bound yet."); - } - - // Group peers by accesses - final Map> accesses = new LinkedHashMap<>(); - for (final Peer peer : allPeers) { - for (final String access : peer.getAccesses()) { - Utilities.setDefault(accesses, access, - new LinkedHashSet()).add(peer); - } - } - - boolean allDone = false; - for (final Entry> entry : accesses.entrySet()) { - final String access = entry.getKey(); - final Set accessPeers = entry.getValue(); - - if (accessPeers.isEmpty()) { - // Nothing to do - continue; - } - - // Find the transport for this access - final ITransport transport = pTransports.get(access); - if (transport == null) { - // No transport for this kind of access - pLogger.log(LogService.LOG_DEBUG, "No transport for " + access); - continue; - } - - final Collection reachedPeers; - try { - // Try to send the message - reachedPeers = transport.fireGroup(aGroupName, accessPeers, - aMessage); - - } catch (final HeraldException ex) { - // Try again... - pLogger.log(LogService.LOG_DEBUG, - "Error group-firing message: " + ex, ex); - continue; - } - - allDone = true; - for (final Set remainingPeers : accesses.values()) { - remainingPeers.removeAll(reachedPeers); - if (!remainingPeers.isEmpty()) { - allDone = false; - } - } - - if (allDone) { - break; - } - } - - final Set missingPeers = new LinkedHashSet<>(); - if (!allDone) { - // Some peers are missing - for (final Set remainingPeers : accesses.values()) { - missingPeers.addAll(remainingPeers); - } - - if (!missingPeers.isEmpty()) { - pLogger.log(LogService.LOG_WARNING, - "Some peers haven't been notified: " + missingPeers); - } else { - pLogger.log(LogService.LOG_DEBUG, - "No peer to send the message to."); - } - } - - return missingPeers; - } - - /** - * Tries to fire a reply to the given message - * - * @param aReplyMessage - * Message to send as a reply - * @param aOriginalMessage - * Message the first argument replies to - * @return The UID of the sent message - * @throws HeraldException - * Error trying to send the reply, or to read information about - * the peer - */ - private String fireReply(final Message aReplyMessage, - final MessageReceived aOriginalMessage) throws HeraldException { - - final String access = aOriginalMessage.getAccess(); - final String sender = aOriginalMessage.getSender(); - - // Look for the transport implementation - final ITransport transport = pTransports.get(access); - if (transport == null) { - throw new NoTransport(new Target(sender), - "No reply transport for access " + access); - } - - // Try to get a Peer bean - Peer peer; - try { - peer = pDirectory.getPeer(aOriginalMessage.getSender()); - - } catch (final UnknownPeer ex) { - // Hope the transport has enough extra information - peer = null; - } - - // Send the reply - transport.fire(peer, aReplyMessage, aOriginalMessage.getExtra()); - return aReplyMessage.getUid(); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#forget(java.lang.String) - */ - @Override - public boolean forget(final String aMessageUid) { - - boolean result = false; - final ForgotMessage exception = new ForgotMessage(aMessageUid); - - // Release the send() call - final EventData event = pWaitingEvents.remove(aMessageUid); - if (event != null) { - event.raiseException(exception); - result = true; - } - - synchronized (pGarbageToken) { - // Notify post() callers - final WaitingPost waitingPost = pWaitingPosts.remove(aMessageUid); - if (waitingPost != null) { - waitingPost.errback(this, exception); - result = true; - } - } - - return result; - } - - /** - * Garbage collects dead waiting post beans. Calls on a regular basis by a - * LoopTimer - */ - private void garbageCollect() { - - synchronized (pGarbageToken) { - // Compute time since the last garbage collection - long delta; - if (pLastGarbage < 0) { - delta = 0; - } else { - delta = System.currentTimeMillis() - pLastGarbage; - } - - // Delete timed out post message beans - final Set toDelete = new LinkedHashSet<>(); - for (final Entry entry : pWaitingPosts - .entrySet()) { - if (entry.getValue().isDead()) { - toDelete.add(entry.getKey()); - } - } - for (final String uid : toDelete) { - pWaitingPosts.remove(uid); - } - - // Delete UID of treated message of more than 5 minutes - toDelete.clear(); - for (final Entry entry : pTreatedMessages.entrySet()) { - final String msgUid = entry.getKey(); - final long newTTL = entry.getValue() + delta; - // Yes, I can *update* an entry while iterating - pTreatedMessages.put(msgUid, newTTL); - - if (newTTL > 300000) { - // More than 5 minutes: forget about the message - toDelete.add(msgUid); - } - } - for (final String msgUid : toDelete) { - pTreatedMessages.remove(msgUid); - } - - // Update the last garbage collection time - pLastGarbage = System.currentTimeMillis(); - } - } - - /** - * Handles a directory update message - * - * @param aMessage - * Message received from another peer - * @param aKind - * Kind of directory message - */ - private void handleDirectoryMessage(final MessageReceived aMessage, - final String aKind) { - - switch (aKind) { - case "bye": { - // A peer is going away - // Message content: the Peer UID - pDirectory.unregister((String) aMessage.getContent()); - break; - } - - default: - // Ignore other messages - break; - } - } - - /** - * Handles an error message - * - * @param aMessage - * The error message - * @param aKind - * Kind of error - */ - private void handleError(final MessageReceived aMessage, final String aKind) { - - switch (aKind) { - case "no-listener": { - // No listener found for a given message - final Map content = (Map) aMessage.getContent(); - final String uid = (String) content.get("uid"); - final String subject = (String) content.get("subject"); - if (uid == null || subject == null || uid.isEmpty() - || subject.isEmpty()) { - // Invalid error content, ignore - return; - } - - // Set up the exception object - final NoListener exception = new NoListener(new Target( - aMessage.getSender()), uid, subject); - - // Unlock the original message sender - final EventData eventData = pWaitingEvents.remove(uid); - if (eventData != null) { - eventData.raiseException(exception); - } - - // Notify post() callers - final WaitingPost waitingPost = pWaitingPosts.remove(uid); - if (waitingPost != null) { - waitingPost.errback(this, exception); - } - break; - } - - default: - // Unknown error - pLogger.log(LogService.LOG_WARNING, "Unknown kind of error: " - + aKind); - break; - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHeraldInternal#handleMessage(org.cohorte.herald. - * MessageReceived) - */ - @Override - public void handleMessage(final MessageReceived aMessage) { - - synchronized (pGarbageToken) { - if (pTreatedMessages.containsValue(aMessage.getUid())) { - // Message already handled, ignore it - return; - } else { - // Store the message UID - pTreatedMessages.put(aMessage.getUid(), - System.currentTimeMillis()); - } - } - - // Clean up the subject - final List parts = new LinkedList<>(); - for (final String part : aMessage.getSubject().split("/")) { - if (!part.isEmpty()) { - parts.add(part); - } - } - - try { - if (parts.get(0).equals("herald")) { - // Internal message - final String category = parts.get(1); - final String kind = parts.get(2); - switch (category) { - case "error": - // Error message: handle it, but don't propagate it - handleError(aMessage, kind); - return; - - case "directory": - // Directory update message - handleDirectoryMessage(aMessage, kind); - break; - - default: - break; - } - } - } catch (final IndexOutOfBoundsException ex) { - // Not enough arguments for a directory update: ignore - } - - // Notify others of the message - notify(aMessage); - } - - /** - * Component invalidated - */ - @Invalidate - public void invalidate() { - - // Stop the garbage collector - pGarbageTimer.cancel(); - - // Stop the thread pool - pPool.shutdownNow(); - - // Clear waiting events - for (final EventData event : pWaitingEvents.values()) { - event.set(null); - } - - synchronized (pGarbageToken) { - final HeraldException exception = new HeraldTimeout(null, - "Herald stops to listen to messages", null); - for (final WaitingPost waiting : pWaitingPosts.values()) { - waiting.errback(this, exception); - } - } - - // Clean up - pWaitingEvents.clear(); - pWaitingPosts.clear(); - pGarbageTimer = null; - pPool = null; - } - - /** - * Calls back message senders about responses or notifies the reception of a - * message - * - * @param aMessage - * The received message - */ - private void notify(final MessageReceived aMessage) { - - final String repliesTo = aMessage.getReplyTo(); - if (repliesTo != null && !repliesTo.isEmpty()) { - // This message is a reply: unlock the sender of the original - // message - final EventData event = pWaitingEvents.remove(repliesTo); - if (event != null) { - // Set the data - event.set(aMessage.getContent()); - } - - synchronized (pGarbageToken) { - // Notify post() callers - final WaitingPost waitingPost = pWaitingPosts.get(repliesTo); - if (waitingPost != null) { - waitingPost.callback(this, aMessage); - if (waitingPost.isForgetOnFirst()) { - // Forget about the message - pWaitingPosts.remove(repliesTo); - } - } - } - } - - // Compute the list of listeners to notify - final Set listeners = new LinkedHashSet<>(); - final String subject = aMessage.getSubject(); - - synchronized (pListeners) { - for (final Entry> entry : pListeners - .entrySet()) { - if (entry.getKey().matches(subject)) { - listeners.addAll(entry.getValue()); - } - } - } - - if (!listeners.isEmpty()) { - // Call listeners in the thread pool - for (final IMessageListener listener : listeners) { - pPool.execute(new Runnable() { - - @Override - public void run() { - - try { - listener.heraldMessage(Herald.this, aMessage); - - } catch (final HeraldException ex) { - pLogger.log(LogService.LOG_WARNING, - "Error notifying listener " + listener - + ": " + ex, ex); - } - } - }); - } - - } else { - // No listener found: send an error message - final Map content = new LinkedHashMap<>(); - content.put("uid", aMessage.getUid()); - content.put("subject", aMessage.getSubject()); - - try { - reply(aMessage, content, "herald/error/no-listener"); - - } catch (final HeraldException ex) { - // We can't send an error back - pLogger.log(LogService.LOG_ERROR, - "Can't send an error back to the sender: " + ex); - } - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#post(org.cohorte.herald.Peer, - * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, - * org.cohorte.herald.IPostErrback) - */ - @Override - public String post(final Peer aPeer, final Message aMessage, - final IPostCallback aCallback, final IPostErrback aErrback) - throws NoTransport { - - return post(aPeer, aMessage, aCallback, aErrback, 180L, true); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#post(org.cohorte.herald.Peer, - * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, - * org.cohorte.herald.IPostErrback, java.lang.Long) - */ - @Override - public String post(final Peer aPeer, final Message aMessage, - final IPostCallback aCallback, final IPostErrback aErrback, - final Long aTimeout) throws NoTransport { - - return post(aPeer, aMessage, aCallback, aErrback, aTimeout, true); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#post(org.cohorte.herald.Peer, - * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, - * org.cohorte.herald.IPostErrback, java.lang.Long, boolean) - */ - @Override - public String post(final Peer aPeer, final Message aMessage, - final IPostCallback aCallback, final IPostErrback aErrback, - final Long aTimeout, final boolean aForgetOnFirst) - throws NoTransport { - - synchronized (pGarbageToken) { - // Prepare an entry in the waiting posts - pWaitingPosts.put(aMessage.getUid(), new WaitingPost(aCallback, - aErrback, aTimeout, aForgetOnFirst)); - } - - try { - // Fire the message - return fire(aPeer, aMessage); - - } catch (final HeraldException ex) { - // Early clean up in case of exception - synchronized (pGarbageToken) { - pWaitingPosts.remove(aMessage.getUid()); - } - throw ex; - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#post(java.lang.String, - * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, - * org.cohorte.herald.IPostErrback) - */ - @Override - public String post(final String aPeerUid, final Message aMessage, - final IPostCallback aCallback, final IPostErrback aErrback) - throws UnknownPeer, NoTransport { - - return post(aPeerUid, aMessage, aCallback, aErrback, 180L, true); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#post(java.lang.String, - * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, - * org.cohorte.herald.IPostErrback, java.lang.Long) - */ - @Override - public String post(final String aPeerUid, final Message aMessage, - final IPostCallback aCallback, final IPostErrback aErrback, - final Long aTimeout) throws UnknownPeer, NoTransport { - - return post(aPeerUid, aMessage, aCallback, aErrback, aTimeout, true); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#post(java.lang.String, - * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, - * org.cohorte.herald.IPostErrback, java.lang.Long, boolean) - */ - @Override - public String post(final String aPeerUid, final Message aMessage, - final IPostCallback aCallback, final IPostErrback aErrback, - final Long aTimeout, final boolean aForgetOnFirst) - throws UnknownPeer, NoTransport { - - return post(pDirectory.getPeer(aPeerUid), aMessage, aCallback, - aErrback, aTimeout, aForgetOnFirst); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#postGroup(java.lang.String, - * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, - * org.cohorte.herald.IPostErrback, java.lang.Long) - */ - @Override - public String postGroup(final String aGroupName, final Message aMessage, - final IPostCallback aCallback, final IPostErrback aErrback, - final Long aTimeout) throws ValueError, NoTransport { - - // Get all peers known in the group - final Collection allPeers = pDirectory - .getPeersForGroup(aGroupName); - if (allPeers.isEmpty()) { - throw new ValueError("Unknown group: " + aGroupName); - } - - if (pTransports.isEmpty()) { - // Make the list of UIDs - throw new NoTransport(new Target(aGroupName, - Target.toUids(allPeers)), "No transport bound yet."); - } - - synchronized (pGarbageToken) { - // Prepare an entry in the waiting posts - pWaitingPosts.put(aMessage.getUid(), new WaitingPost(aCallback, - aErrback, aTimeout, false)); - } - - // Group peers by accesses - final Map> accesses = new LinkedHashMap<>(); - for (final Peer peer : allPeers) { - for (final String access : peer.getAccesses()) { - Utilities.setDefault(accesses, access, - new LinkedHashSet()).add(peer); - } - } - - for (final Entry> entry : accesses.entrySet()) { - final String access = entry.getKey(); - final Set accessPeers = entry.getValue(); - - if (accessPeers.isEmpty()) { - // Nothing to do - continue; - } - - // Find the transport for this access - final ITransport transport = pTransports.get(access); - if (transport == null) { - // No transport for this kind of access - pLogger.log(LogService.LOG_DEBUG, "No transport for " + access); - continue; - } - - final Collection reachedPeers; - try { - // Try to send the message - reachedPeers = transport.fireGroup(aGroupName, accessPeers, - aMessage); - - } catch (final HeraldException ex) { - // Try again... - pLogger.log(LogService.LOG_DEBUG, - "Error group-firing message: " + ex, ex); - continue; - } - - boolean allDone = true; - for (final Set remainingPeers : accesses.values()) { - remainingPeers.removeAll(reachedPeers); - if (!remainingPeers.isEmpty()) { - allDone = false; - } - } - - if (allDone) { - break; - } - } - - return aMessage.getUid(); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#reply(org.cohorte.herald.MessageReceived, - * java.lang.Object) - */ - @Override - public void reply(final MessageReceived aMessage, final Object aContent) - throws HeraldException { - - reply(aMessage, aContent, null); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#reply(org.cohorte.herald.MessageReceived, - * java.lang.Object, java.lang.String) - */ - @Override - public void reply(final MessageReceived aMessage, final Object aContent, - final String aSubject) throws HeraldException { - - // Normalize the subject - String subject = aSubject; - if (subject == null || subject.isEmpty()) { - subject = "reply/" + aMessage.getSubject(); - } - - // Prepare the message to send - final Message newMessage = new Message(subject, aContent); - - try { - // Try to reuse the transport - fireReply(newMessage, aMessage); - return; - } catch (final HeraldException ex) { - // Can't reuse the transport, use another one - } - - fire(aMessage.getSender(), newMessage); - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#send(org.cohorte.herald.Peer, - * org.cohorte.herald.Message) - */ - @Override - public Object send(final Peer aPeer, final Message aMessage) - throws HeraldException { - - try { - return send(aPeer, aMessage, null); - - } catch (final HeraldTimeout ex) { - // Can't happen - return null; - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#send(org.cohorte.herald.Peer, - * org.cohorte.herald.Message, java.lang.Long) - */ - @Override - public Object send(final Peer aPeer, final Message aMessage, - final Long aTimeout) throws HeraldException { - - // Prepare the event bean - final EventData event = new EventData<>(); - pWaitingEvents.put(aMessage.getUid(), event); - - try { - // Fire the message - fire(aPeer, aMessage); - - // Message sent: wait for an answer - if (event.waitEvent(aTimeout)) { - final Object data = event.getData(); - if (data != null) { - return data; - - } else { - // Herald is stopping... - throw new HeraldTimeout(new Target(aPeer), - "Herald stops listening to message", aMessage); - } - - } else { - throw new HeraldTimeout(new Target(aPeer), - "Timeout reached before receiving a reply", aMessage); - } - - } catch (final EventException ex) { - // Something went wrong waiting for the event - if (ex.getCause() instanceof HeraldException) { - throw (HeraldException) ex.getCause(); - } else { - throw new HeraldException(new Target(aPeer), - "Error waiting for an answer: " + ex, ex); - } - - } finally { - // Clean up - pWaitingEvents.remove(aMessage.getUid()); - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#send(java.lang.String, - * org.cohorte.herald.Message) - */ - @Override - public Object send(final String aPeerUid, final Message aMessage) - throws HeraldException { - - try { - return send(aPeerUid, aMessage, null); - - } catch (final HeraldTimeout ex) { - // Can't happen - return null; - } - } - - /* - * (non-Javadoc) - * - * @see org.cohorte.herald.IHerald#send(java.lang.String, - * org.cohorte.herald.Message, java.lang.Long) - */ - @Override - public Object send(final String aPeerUid, final Message aMessage, - final Long aTimeout) throws HeraldException { - - return send(pDirectory.getPeer(aPeerUid), aMessage, aTimeout); - } - - /** - * A message listener has gone away - * - * @param aListener - * A message listener - * @param aReference - * The injected service reference - */ - @Unbind(id = ID_LISTENERS) - protected void unbindListener(final IMessageListener aListener, - final ServiceReference aReference) { - - synchronized (pListeners) { - final Set filters = pListenersFilters.remove(aListener); - if (filters == null) { - // Unknown listener - return; - } - - for (final FnMatch match : filters) { - // Forget about the listener - final Set listeners = pListeners.get(match); - listeners.remove(aListener); - - // Clean up - if (listeners.isEmpty()) { - pListeners.remove(match); - } - } - } - } - - /** - * A transport implementation has gone away - * - * @param aTransport - * A transport implementation - * @param aReference - * The injected service reference - */ - @Unbind(id = ID_TRANSPORTS) - protected void unbindTransport(final ITransport aTransport, - final ServiceReference aReference) { - - final String accessId = (String) aReference - .getProperty(IConstants.PROP_ACCESS_ID); - if (accessId == null || accessId.isEmpty()) { - // Ignore invalid access IDs - return; - } - - synchronized (pTransports) { - // Forget about the service - pTransports.remove(accessId); - - if (pTransports.isEmpty() && pSvcRegistration != null) { - // No more transport service: we can't provide the service - pSvcRegistration.unregister(); - pSvcRegistration = null; - } - } - } - - /** - * A message listener has been updated - * - * @param aListener - * A message listener - * @param aReference - * The injected service reference - */ - @Modified(id = ID_LISTENERS) - protected void updateListener(final IMessageListener aListener, - final ServiceReference aReference) { - - final Object rawFilters = aReference - .getProperty(IConstants.PROP_FILTERS); - final Set newFilters = new LinkedHashSet<>(); - if (rawFilters instanceof String) { - // Single filter - newFilters.add((String) rawFilters); - - } else if (rawFilters instanceof String[]) { - // Copy the array - newFilters.addAll(Arrays.asList((String[]) rawFilters)); - - } else { - // Unreadable filters: forget about the listener - unbindListener(aListener, aReference); - return; - } - - synchronized (pListeners) { - // Get current filters - final Set currentFilters = Utilities.setDefault( - pListenersFilters, aListener, new LinkedHashSet()); - final Set currentFiltersStrings = new LinkedHashSet<>( - currentFilters.size()); - for (final FnMatch filter : currentFilters) { - currentFiltersStrings.add(filter.toString()); - } - - // Compare with known state - final Set addedFilters = new LinkedHashSet<>(newFilters); - addedFilters.removeAll(currentFiltersStrings); - - final Set removedFilters = new LinkedHashSet<>( - currentFiltersStrings); - removedFilters.removeAll(newFilters); - - // Add new filters - for (final String filter : addedFilters) { - // Compile the filter - final FnMatch match = new FnMatch(filter); - - // Associate the listener to the filter - Utilities.setDefault(pListeners, match, - new LinkedHashSet()).add(aListener); - Utilities.setDefault(pListenersFilters, aListener, - new LinkedHashSet()).add(match); - } - - // Clean up removed ones - for (final String filter : removedFilters) { - // Compile the filter - final FnMatch match = new FnMatch(filter); - - // Remove the listener from the registry - final Set listeners = pListeners.get(match); - listeners.remove(aListener); - - // Clean up - if (listeners.isEmpty()) { - pListeners.remove(match); - } - } - } - } - - /** - * Component validated - */ - @Validate - public void validate() { - - // Start the notification thread pool - pPool = Executors.newFixedThreadPool(5); - - // Start the garbage collector - pGarbageTimer = new LoopTimer(30000, new Runnable() { - - @Override - public void run() { - - garbageCollect(); - } - }, "Herald-GC"); - pGarbageTimer.start(); - } + /** iPOJO requirement ID */ + private static final String ID_LISTENERS = "listeners"; + + /** iPOJO requirement ID */ + private static final String ID_TRANSPORTS = "transports"; + + /** The bundle context */ + private final BundleContext pContext; + + /** The Herald core directory */ + @Requires + private IDirectory pDirectory; + + /** The garbage collection timer */ + private LoopTimer pGarbageTimer; + + /** Object used to synchronize garbage collection */ + private final Object pGarbageToken = new Object(); + + /** Time stamp of the last garbage collection */ + private long pLastGarbage = -1; + + /** Filter -> Listeners */ + private final Map> pListeners = new LinkedHashMap<>(); + + /** Listener -> Filters */ + private final Map> pListenersFilters = new LinkedHashMap>(); + + /** The logger */ + @Requires(optional = true) + private LogService pLogger; + + /** The thread pool */ + private ExecutorService pPool; + + /** Herald "public" service registration */ + private ServiceRegistration pSvcRegistration; + + /** Access ID -> Transport implementation */ + private final Map pTransports = new LinkedHashMap<>(); + + /** List of received messages UIDs, kept 5 minutes: UID -> TTL */ + private final Map pTreatedMessages = new LinkedHashMap<>(); + + /** Events used to block "send()" methods: UID -> EventData */ + private final Map> pWaitingEvents = new LinkedHashMap<>(); + + /** Events used for "post()" methods: UID -> WaitingPost */ + private final Map pWaitingPosts = new LinkedHashMap<>(); + + /** + * Sets up members + * + * @param aContext + * The bundle context + */ + public Herald(final BundleContext aContext) { + + pContext = aContext; + } + + @Override + public void addMessageListener(final IMessageListener aListener, + final String[] filters) { + + synchronized (pListeners) { + for (final String filter : filters) { + // Compile the filter + final FnMatch match = new FnMatch(filter); + + // Associate the listener to the filter + Utilities.setDefault(pListeners, match, + new LinkedHashSet()).add(aListener); + Utilities.setDefault(pListenersFilters, aListener, + new LinkedHashSet()).add(match); + } + } + } + + /** + * A message listener has been bound + * + * @param aListener + * A message listener + * @param aReference + * The injected service reference + */ + @Bind(id = ID_LISTENERS, aggregate = true, optional = true) + protected void bindListener(final IMessageListener aListener, + final ServiceReference aReference) { + + final Object rawFilters = aReference + .getProperty(IConstants.PROP_FILTERS); + String[] filters; + if (rawFilters instanceof String) { + // Single filter + filters = new String[] { (String) rawFilters }; + + } else if (rawFilters instanceof String[]) { + // Copy the array + final String[] givenFilters = (String[]) rawFilters; + filters = Arrays.copyOf(givenFilters, givenFilters.length); + + } else { + // Unreadable filters + return; + } + + addMessageListener(aListener, filters); + } + + /** + * A transport implementation has been bound + * + * @param aTransport + * A transport implementation + * @param aReference + * The injected service reference + */ + @Bind(id = ID_TRANSPORTS, aggregate = true, optional = true) + protected void bindTransport(final ITransport aTransport, + final ServiceReference aReference) { + + final String accessId = (String) aReference + .getProperty(IConstants.PROP_ACCESS_ID); + if (accessId == null || accessId.isEmpty()) { + // Ignore invalid access IDs + return; + } + + synchronized (pTransports) { + // Store the service + pTransports.put(accessId, aTransport); + + if (pSvcRegistration == null) { + // We have at least one service: provide our service + pSvcRegistration = pContext.registerService(IHerald.class, + this, null); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#fire(org.cohorte.herald.Peer, + * org.cohorte.herald.Message) + */ + @Override + public String fire(final Peer aPeer, final Message aMessage) + throws NoTransport { + + // Check if we can send the message + if (pTransports.isEmpty()) { + throw new NoTransport(new Target(aPeer), "No transport bound yet."); + } + + // Try each access + boolean success = false; + for (final String access : aPeer.getAccesses()) { + final ITransport transport = pTransports.get(access); + if (transport == null) { + // No transport for this kind of access + continue; + } + + try { + // Try to use it + transport.fire(aPeer, aMessage); + + // Success: stop here + success = true; + break; + + } catch (final HeraldException ex) { + // Exception during transport + pLogger.log(LogService.LOG_WARNING, "Error using transport " + + access + ": " + ex); + } + } + + if (!success) { + // No transport succeeded + throw new NoTransport(new Target(aPeer), + "No working transport found for peer " + aPeer); + } + + return aMessage.getUid(); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#fire(java.lang.String, + * org.cohorte.herald.Message) + */ + @Override + public String fire(final String aPeerUid, final Message aMessage) + throws NoTransport, UnknownPeer { + + return fire(pDirectory.getPeer(aPeerUid), aMessage); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#fireGroup(java.lang.String, + * org.cohorte.herald.Message) + */ + @Override + public Collection fireGroup(final String aGroupName, + final Message aMessage) throws NoTransport { + + // Get all peers known in the group + final Collection allPeers = pDirectory + .getPeersForGroup(aGroupName); + if (allPeers.isEmpty()) { + pLogger.log(LogService.LOG_WARNING, "No peer in group: " + + aGroupName); + return new LinkedHashSet<>(); + } + + if (pTransports.isEmpty()) { + // Make the list of UIDs + throw new NoTransport(new Target(aGroupName, + Target.toUids(allPeers)), "No transport bound yet."); + } + + // Group peers by accesses + final Map> accesses = new LinkedHashMap<>(); + for (final Peer peer : allPeers) { + for (final String access : peer.getAccesses()) { + Utilities.setDefault(accesses, access, + new LinkedHashSet()).add(peer); + } + } + + boolean allDone = false; + for (final Entry> entry : accesses.entrySet()) { + final String access = entry.getKey(); + final Set accessPeers = entry.getValue(); + + if (accessPeers.isEmpty()) { + // Nothing to do + continue; + } + + // Find the transport for this access + final ITransport transport = pTransports.get(access); + if (transport == null) { + // No transport for this kind of access + pLogger.log(LogService.LOG_DEBUG, "No transport for " + access); + continue; + } + + final Collection reachedPeers; + try { + // Try to send the message + reachedPeers = transport.fireGroup(aGroupName, accessPeers, + aMessage); + + } catch (final HeraldException ex) { + // Try again... + pLogger.log(LogService.LOG_DEBUG, + "Error group-firing message: " + ex, ex); + continue; + } + + allDone = true; + for (final Set remainingPeers : accesses.values()) { + remainingPeers.removeAll(reachedPeers); + if (!remainingPeers.isEmpty()) { + allDone = false; + } + } + + if (allDone) { + break; + } + } + + final Set missingPeers = new LinkedHashSet<>(); + if (!allDone) { + // Some peers are missing + for (final Set remainingPeers : accesses.values()) { + missingPeers.addAll(remainingPeers); + } + + if (!missingPeers.isEmpty()) { + pLogger.log(LogService.LOG_WARNING, + "Some peers haven't been notified: " + missingPeers); + } else { + pLogger.log(LogService.LOG_DEBUG, + "No peer to send the message to."); + } + } + + return missingPeers; + } + + /** + * Tries to fire a reply to the given message + * + * @param aReplyMessage + * Message to send as a reply + * @param aOriginalMessage + * Message the first argument replies to + * @return The UID of the sent message + * @throws HeraldException + * Error trying to send the reply, or to read information about + * the peer + */ + private String fireReply(final Message aReplyMessage, + final MessageReceived aOriginalMessage) throws HeraldException { + + final String access = aOriginalMessage.getAccess(); + final String sender = aOriginalMessage.getSender(); + + // Look for the transport implementation + final ITransport transport = pTransports.get(access); + if (transport == null) { + throw new NoTransport(new Target(sender), + "No reply transport for access " + access); + } + + // Try to get a Peer bean + Peer peer; + try { + peer = pDirectory.getPeer(aOriginalMessage.getSender()); + + } catch (final UnknownPeer ex) { + // Hope the transport has enough extra information + peer = null; + } + + // Send the reply + transport.fire(peer, aReplyMessage, aOriginalMessage.getExtra()); + return aReplyMessage.getUid(); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#forget(java.lang.String) + */ + @Override + public boolean forget(final String aMessageUid) { + + boolean result = false; + final ForgotMessage exception = new ForgotMessage(aMessageUid); + + // Release the send() call + final EventData event = pWaitingEvents.remove(aMessageUid); + if (event != null) { + event.raiseException(exception); + result = true; + } + + synchronized (pGarbageToken) { + // Notify post() callers + final WaitingPost waitingPost = pWaitingPosts.remove(aMessageUid); + if (waitingPost != null) { + waitingPost.errback(this, exception); + result = true; + } + } + + return result; + } + + /** + * Garbage collects dead waiting post beans. Calls on a regular basis by a + * LoopTimer + */ + private void garbageCollect() { + + synchronized (pGarbageToken) { + // Compute time since the last garbage collection + long delta; + if (pLastGarbage < 0) { + delta = 0; + } else { + delta = System.currentTimeMillis() - pLastGarbage; + } + + // Delete timed out post message beans + final Set toDelete = new LinkedHashSet<>(); + for (final Entry entry : pWaitingPosts + .entrySet()) { + if (entry.getValue().isDead()) { + toDelete.add(entry.getKey()); + } + } + for (final String uid : toDelete) { + pWaitingPosts.remove(uid); + } + + // Delete UID of treated message of more than 5 minutes + toDelete.clear(); + for (final Entry entry : pTreatedMessages.entrySet()) { + final String msgUid = entry.getKey(); + final long newTTL = entry.getValue() + delta; + // Yes, I can *update* an entry while iterating + pTreatedMessages.put(msgUid, newTTL); + + if (newTTL > 300000) { + // More than 5 minutes: forget about the message + toDelete.add(msgUid); + } + } + for (final String msgUid : toDelete) { + pTreatedMessages.remove(msgUid); + } + + // Update the last garbage collection time + pLastGarbage = System.currentTimeMillis(); + } + } + + /** + * Handles a directory update message + * + * @param aMessage + * Message received from another peer + * @param aKind + * Kind of directory message + */ + private void handleDirectoryMessage(final MessageReceived aMessage, + final String aKind) { + + switch (aKind) { + case "bye": { + // A peer is going away + // Message content: the Peer UID + pDirectory.unregister((String) aMessage.getContent()); + break; + } + + default: + // Ignore other messages + break; + } + } + + /** + * Handles an error message + * + * @param aMessage + * The error message + * @param aKind + * Kind of error + */ + private void handleError(final MessageReceived aMessage, final String aKind) { + + switch (aKind) { + case "no-listener": { + // No listener found for a given message + final Map content = (Map) aMessage.getContent(); + final String uid = (String) content.get("uid"); + final String subject = (String) content.get("subject"); + if (uid == null || subject == null || uid.isEmpty() + || subject.isEmpty()) { + // Invalid error content, ignore + return; + } + + // Set up the exception object + final NoListener exception = new NoListener(new Target( + aMessage.getSender()), uid, subject); + + // Unlock the original message sender + final EventData eventData = pWaitingEvents.remove(uid); + if (eventData != null) { + eventData.raiseException(exception); + } + + // Notify post() callers + final WaitingPost waitingPost = pWaitingPosts.remove(uid); + if (waitingPost != null) { + waitingPost.errback(this, exception); + } + break; + } + + default: + // Unknown error + pLogger.log(LogService.LOG_WARNING, "Unknown kind of error: " + + aKind); + break; + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHeraldInternal#handleMessage(org.cohorte.herald. + * MessageReceived) + */ + @Override + public void handleMessage(final MessageReceived aMessage) { + + synchronized (pGarbageToken) { + if (pTreatedMessages.containsValue(aMessage.getUid())) { + // Message already handled, ignore it + return; + } else { + // Store the message UID + pTreatedMessages.put(aMessage.getUid(), + System.currentTimeMillis()); + } + } + + // Clean up the subject + final List parts = new LinkedList<>(); + for (final String part : aMessage.getSubject().split("/")) { + if (!part.isEmpty()) { + parts.add(part); + } + } + + try { + if (parts.get(0).equals("herald")) { + // Internal message + final String category = parts.get(1); + final String kind = parts.get(2); + switch (category) { + case "error": + // Error message: handle it, but don't propagate it + handleError(aMessage, kind); + return; + + case "directory": + // Directory update message + handleDirectoryMessage(aMessage, kind); + break; + + default: + break; + } + } + } catch (final IndexOutOfBoundsException ex) { + // Not enough arguments for a directory update: ignore + } + + // Notify others of the message + notify(aMessage); + } + + /** + * Component invalidated + */ + @Invalidate + public void invalidate() { + + // Stop the garbage collector + pGarbageTimer.cancel(); + + // Stop the thread pool + pPool.shutdownNow(); + + // Clear waiting events + for (final EventData event : pWaitingEvents.values()) { + event.set(null); + } + + synchronized (pGarbageToken) { + final HeraldException exception = new HeraldTimeout(null, + "Herald stops to listen to messages", null); + for (final WaitingPost waiting : pWaitingPosts.values()) { + waiting.errback(this, exception); + } + } + + // Clean up + pWaitingEvents.clear(); + pWaitingPosts.clear(); + pGarbageTimer = null; + pPool = null; + } + + /** + * Calls back message senders about responses or notifies the reception of a + * message + * + * @param aMessage + * The received message + */ + private void notify(final MessageReceived aMessage) { + + final String repliesTo = aMessage.getReplyTo(); + if (repliesTo != null && !repliesTo.isEmpty()) { + // This message is a reply: unlock the sender of the original + // message + final EventData event = pWaitingEvents.remove(repliesTo); + if (event != null) { + // Set the data + event.set(aMessage.getContent()); + } + + synchronized (pGarbageToken) { + // Notify post() callers + final WaitingPost waitingPost = pWaitingPosts.get(repliesTo); + if (waitingPost != null) { + waitingPost.callback(this, aMessage); + if (waitingPost.isForgetOnFirst()) { + // Forget about the message + pWaitingPosts.remove(repliesTo); + } + } + } + } + + // Compute the list of listeners to notify + final Set listeners = new LinkedHashSet<>(); + final String subject = aMessage.getSubject(); + + synchronized (pListeners) { + for (final Entry> entry : pListeners + .entrySet()) { + if (entry.getKey().matches(subject)) { + listeners.addAll(entry.getValue()); + } + } + } + + if (!listeners.isEmpty()) { + // Call listeners in the thread pool + for (final IMessageListener listener : listeners) { + pPool.execute(new Runnable() { + + @Override + public void run() { + + try { + listener.heraldMessage(Herald.this, aMessage); + + } catch (final HeraldException ex) { + pLogger.log(LogService.LOG_WARNING, + "Error notifying listener " + listener + + ": " + ex, ex); + } + } + }); + } + + } else { + // No listener found: send an error message + final Map content = new LinkedHashMap<>(); + content.put("uid", aMessage.getUid()); + content.put("subject", aMessage.getSubject()); + + try { + reply(aMessage, content, "herald/error/no-listener"); + + } catch (final HeraldException ex) { + // We can't send an error back + pLogger.log(LogService.LOG_ERROR, + "Can't send an error back to the sender: " + ex); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#post(org.cohorte.herald.Peer, + * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, + * org.cohorte.herald.IPostErrback) + */ + @Override + public String post(final Peer aPeer, final Message aMessage, + final IPostCallback aCallback, final IPostErrback aErrback) + throws NoTransport { + + return post(aPeer, aMessage, aCallback, aErrback, 180L, true); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#post(org.cohorte.herald.Peer, + * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, + * org.cohorte.herald.IPostErrback, java.lang.Long) + */ + @Override + public String post(final Peer aPeer, final Message aMessage, + final IPostCallback aCallback, final IPostErrback aErrback, + final Long aTimeout) throws NoTransport { + + return post(aPeer, aMessage, aCallback, aErrback, aTimeout, true); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#post(org.cohorte.herald.Peer, + * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, + * org.cohorte.herald.IPostErrback, java.lang.Long, boolean) + */ + @Override + public String post(final Peer aPeer, final Message aMessage, + final IPostCallback aCallback, final IPostErrback aErrback, + final Long aTimeout, final boolean aForgetOnFirst) + throws NoTransport { + + synchronized (pGarbageToken) { + // Prepare an entry in the waiting posts + pWaitingPosts.put(aMessage.getUid(), new WaitingPost(aCallback, + aErrback, aTimeout, aForgetOnFirst)); + } + + try { + // Fire the message + return fire(aPeer, aMessage); + + } catch (final HeraldException ex) { + // Early clean up in case of exception + synchronized (pGarbageToken) { + pWaitingPosts.remove(aMessage.getUid()); + } + throw ex; + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#post(java.lang.String, + * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, + * org.cohorte.herald.IPostErrback) + */ + @Override + public String post(final String aPeerUid, final Message aMessage, + final IPostCallback aCallback, final IPostErrback aErrback) + throws UnknownPeer, NoTransport { + + return post(aPeerUid, aMessage, aCallback, aErrback, 180L, true); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#post(java.lang.String, + * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, + * org.cohorte.herald.IPostErrback, java.lang.Long) + */ + @Override + public String post(final String aPeerUid, final Message aMessage, + final IPostCallback aCallback, final IPostErrback aErrback, + final Long aTimeout) throws UnknownPeer, NoTransport { + + return post(aPeerUid, aMessage, aCallback, aErrback, aTimeout, true); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#post(java.lang.String, + * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, + * org.cohorte.herald.IPostErrback, java.lang.Long, boolean) + */ + @Override + public String post(final String aPeerUid, final Message aMessage, + final IPostCallback aCallback, final IPostErrback aErrback, + final Long aTimeout, final boolean aForgetOnFirst) + throws UnknownPeer, NoTransport { + + return post(pDirectory.getPeer(aPeerUid), aMessage, aCallback, + aErrback, aTimeout, aForgetOnFirst); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#postGroup(java.lang.String, + * org.cohorte.herald.Message, org.cohorte.herald.IPostCallback, + * org.cohorte.herald.IPostErrback, java.lang.Long) + */ + @Override + public String postGroup(final String aGroupName, final Message aMessage, + final IPostCallback aCallback, final IPostErrback aErrback, + final Long aTimeout) throws ValueError, NoTransport { + + // Get all peers known in the group + final Collection allPeers = pDirectory + .getPeersForGroup(aGroupName); + if (allPeers.isEmpty()) { + throw new ValueError("Unknown group: " + aGroupName); + } + + if (pTransports.isEmpty()) { + // Make the list of UIDs + throw new NoTransport(new Target(aGroupName, + Target.toUids(allPeers)), "No transport bound yet."); + } + + synchronized (pGarbageToken) { + // Prepare an entry in the waiting posts + pWaitingPosts.put(aMessage.getUid(), new WaitingPost(aCallback, + aErrback, aTimeout, false)); + } + + // Group peers by accesses + final Map> accesses = new LinkedHashMap<>(); + for (final Peer peer : allPeers) { + for (final String access : peer.getAccesses()) { + Utilities.setDefault(accesses, access, + new LinkedHashSet()).add(peer); + } + } + + for (final Entry> entry : accesses.entrySet()) { + final String access = entry.getKey(); + final Set accessPeers = entry.getValue(); + + if (accessPeers.isEmpty()) { + // Nothing to do + continue; + } + + // Find the transport for this access + final ITransport transport = pTransports.get(access); + if (transport == null) { + // No transport for this kind of access + pLogger.log(LogService.LOG_DEBUG, "No transport for " + access); + continue; + } + + final Collection reachedPeers; + try { + // Try to send the message + reachedPeers = transport.fireGroup(aGroupName, accessPeers, + aMessage); + + } catch (final HeraldException ex) { + // Try again... + pLogger.log(LogService.LOG_DEBUG, + "Error group-firing message: " + ex, ex); + continue; + } + + boolean allDone = true; + for (final Set remainingPeers : accesses.values()) { + remainingPeers.removeAll(reachedPeers); + if (!remainingPeers.isEmpty()) { + allDone = false; + } + } + + if (allDone) { + break; + } + } + + return aMessage.getUid(); + } + + @Override + public void removeMessageListener(final IMessageListener aListener) { + synchronized (pListeners) { + final Set filters = pListenersFilters.remove(aListener); + if (filters == null) { + // Unknown listener + return; + } + + for (final FnMatch match : filters) { + // Forget about the listener + final Set listeners = pListeners.get(match); + listeners.remove(aListener); + + // Clean up + if (listeners.isEmpty()) { + pListeners.remove(match); + } + } + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#reply(org.cohorte.herald.MessageReceived, + * java.lang.Object) + */ + @Override + public void reply(final MessageReceived aMessage, final Object aContent) + throws HeraldException { + + reply(aMessage, aContent, null); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#reply(org.cohorte.herald.MessageReceived, + * java.lang.Object, java.lang.String) + */ + @Override + public void reply(final MessageReceived aMessage, final Object aContent, + final String aSubject) throws HeraldException { + + // Normalize the subject + String subject = aSubject; + if (subject == null || subject.isEmpty()) { + subject = "reply/" + aMessage.getSubject(); + } + + // Prepare the message to send + final Message newMessage = new Message(subject, aContent); + + try { + // Try to reuse the transport + fireReply(newMessage, aMessage); + return; + } catch (final HeraldException ex) { + // Can't reuse the transport, use another one + } + + fire(aMessage.getSender(), newMessage); + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#send(org.cohorte.herald.Peer, + * org.cohorte.herald.Message) + */ + @Override + public Object send(final Peer aPeer, final Message aMessage) + throws HeraldException { + + try { + return send(aPeer, aMessage, null); + + } catch (final HeraldTimeout ex) { + // Can't happen + return null; + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#send(org.cohorte.herald.Peer, + * org.cohorte.herald.Message, java.lang.Long) + */ + @Override + public Object send(final Peer aPeer, final Message aMessage, + final Long aTimeout) throws HeraldException { + + // Prepare the event bean + final EventData event = new EventData<>(); + pWaitingEvents.put(aMessage.getUid(), event); + + try { + // Fire the message + fire(aPeer, aMessage); + + // Message sent: wait for an answer + if (event.waitEvent(aTimeout)) { + final Object data = event.getData(); + if (data != null) { + return data; + + } else { + // Herald is stopping... + throw new HeraldTimeout(new Target(aPeer), + "Herald stops listening to message", aMessage); + } + + } else { + throw new HeraldTimeout(new Target(aPeer), + "Timeout reached before receiving a reply", aMessage); + } + + } catch (final EventException ex) { + // Something went wrong waiting for the event + if (ex.getCause() instanceof HeraldException) { + throw (HeraldException) ex.getCause(); + } else { + throw new HeraldException(new Target(aPeer), + "Error waiting for an answer: " + ex, ex); + } + + } finally { + // Clean up + pWaitingEvents.remove(aMessage.getUid()); + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#send(java.lang.String, + * org.cohorte.herald.Message) + */ + @Override + public Object send(final String aPeerUid, final Message aMessage) + throws HeraldException { + + try { + return send(aPeerUid, aMessage, null); + + } catch (final HeraldTimeout ex) { + // Can't happen + return null; + } + } + + /* + * (non-Javadoc) + * + * @see org.cohorte.herald.IHerald#send(java.lang.String, + * org.cohorte.herald.Message, java.lang.Long) + */ + @Override + public Object send(final String aPeerUid, final Message aMessage, + final Long aTimeout) throws HeraldException { + + return send(pDirectory.getPeer(aPeerUid), aMessage, aTimeout); + } + + /** + * A message listener has gone away + * + * @param aListener + * A message listener + * @param aReference + * The injected service reference + */ + @Unbind(id = ID_LISTENERS) + protected void unbindListener(final IMessageListener aListener, + final ServiceReference aReference) { + + removeMessageListener(aListener); + } + + /** + * A transport implementation has gone away + * + * @param aTransport + * A transport implementation + * @param aReference + * The injected service reference + */ + @Unbind(id = ID_TRANSPORTS) + protected void unbindTransport(final ITransport aTransport, + final ServiceReference aReference) { + + final String accessId = (String) aReference + .getProperty(IConstants.PROP_ACCESS_ID); + if (accessId == null || accessId.isEmpty()) { + // Ignore invalid access IDs + return; + } + + synchronized (pTransports) { + // Forget about the service + pTransports.remove(accessId); + + if (pTransports.isEmpty() && pSvcRegistration != null) { + // No more transport service: we can't provide the service + pSvcRegistration.unregister(); + pSvcRegistration = null; + } + } + } + + /** + * A message listener has been updated + * + * @param aListener + * A message listener + * @param aReference + * The injected service reference + */ + @Modified(id = ID_LISTENERS) + protected void updateListener(final IMessageListener aListener, + final ServiceReference aReference) { + + final Object rawFilters = aReference + .getProperty(IConstants.PROP_FILTERS); + final Set newFilters = new LinkedHashSet<>(); + if (rawFilters instanceof String) { + // Single filter + newFilters.add((String) rawFilters); + + } else if (rawFilters instanceof String[]) { + // Copy the array + newFilters.addAll(Arrays.asList((String[]) rawFilters)); + + } else { + // Unreadable filters: forget about the listener + unbindListener(aListener, aReference); + return; + } + + synchronized (pListeners) { + // Get current filters + final Set currentFilters = Utilities.setDefault( + pListenersFilters, aListener, new LinkedHashSet()); + final Set currentFiltersStrings = new LinkedHashSet<>( + currentFilters.size()); + for (final FnMatch filter : currentFilters) { + currentFiltersStrings.add(filter.toString()); + } + + // Compare with known state + final Set addedFilters = new LinkedHashSet<>(newFilters); + addedFilters.removeAll(currentFiltersStrings); + + final Set removedFilters = new LinkedHashSet<>( + currentFiltersStrings); + removedFilters.removeAll(newFilters); + + // Add new filters + for (final String filter : addedFilters) { + // Compile the filter + final FnMatch match = new FnMatch(filter); + + // Associate the listener to the filter + Utilities.setDefault(pListeners, match, + new LinkedHashSet()).add(aListener); + Utilities.setDefault(pListenersFilters, aListener, + new LinkedHashSet()).add(match); + } + + // Clean up removed ones + for (final String filter : removedFilters) { + // Compile the filter + final FnMatch match = new FnMatch(filter); + + // Remove the listener from the registry + final Set listeners = pListeners.get(match); + listeners.remove(aListener); + + // Clean up + if (listeners.isEmpty()) { + pListeners.remove(match); + } + } + } + } + + /** + * Component validated + */ + @Validate + public void validate() { + + // Start the notification thread pool + pPool = Executors.newFixedThreadPool(5); + + // Start the garbage collector + pGarbageTimer = new LoopTimer(30000, new Runnable() { + + @Override + public void run() { + + garbageCollect(); + } + }, "Herald-GC"); + pGarbageTimer.start(); + } } diff --git a/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java b/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java index d751785..7232639 100755 --- a/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java +++ b/java/org.cohorte.herald.rpc/src/main/java/org/cohorte/herald/rpc/HeraldRpcExporter.java @@ -20,12 +20,14 @@ import java.util.Map; import java.util.UUID; +import org.apache.felix.ipojo.annotations.Bind; import org.apache.felix.ipojo.annotations.Component; import org.apache.felix.ipojo.annotations.Instantiate; import org.apache.felix.ipojo.annotations.Invalidate; import org.apache.felix.ipojo.annotations.Provides; import org.apache.felix.ipojo.annotations.Requires; import org.apache.felix.ipojo.annotations.ServiceProperty; +import org.apache.felix.ipojo.annotations.Unbind; import org.apache.felix.ipojo.annotations.Validate; import org.cohorte.herald.HeraldException; import org.cohorte.herald.IConstants; @@ -76,6 +78,9 @@ public class HeraldRpcExporter implements IServiceExporter, IMessageListener { + IHeraldRpcConstants.SUBJECT_REPLY + "}") private String[] pFilters; + @Requires(id = "herald-core") + private IHerald pHerald; + /** The JSON-RPC bridge (Jabsorb) */ private JSONRPCBridge pJsonRpcBridge; @@ -97,9 +102,17 @@ public HeraldRpcExporter(final BundleContext aContext) { pContext = aContext; } + /** + * Called when herald is bind to this component + */ + @Bind(id = "herald-core") + public void bindIHerald() { + pHerald.addMessageListener(this, pFilters); + } + /* * (non-Javadoc) - * + * * @see * org.cohorte.remote.IServiceExporter#exportService(org.osgi.framework. * ServiceReference, java.lang.String, java.lang.String) @@ -139,7 +152,7 @@ public ExportEndpoint exportService(final ServiceReference aReference, /* * (non-Javadoc) - * + * * @see org.cohorte.remote.IServiceExporter#handles(java.lang.String[]) */ @Override @@ -166,7 +179,7 @@ public boolean handles(final String[] aConfigurations) { /* * (non-Javadoc) - * + * * @see * org.cohorte.herald.IMessageListener#heraldMessage(org.cohorte.herald. * IHerald, org.cohorte.herald.MessageReceived) @@ -242,9 +255,19 @@ public void invalidate() { pJsonRpcBridge = null; } + /** + * Called when herald is unbind + */ + @Unbind(id = "herald-core") + public void unbindIHerald() { + if (pHerald != null) { + pHerald.removeMessageListener(this); + } + } + /* * (non-Javadoc) - * + * * @see * org.cohorte.remote.IServiceExporter#unexportService(org.cohorte.remote * .ExportEndpoint) @@ -269,7 +292,7 @@ public void unexportService(final ExportEndpoint aEndpoint) { /* * (non-Javadoc) - * + * * @see org.cohorte.remote.IServiceExporter#updateExport(org.cohorte.remote. * ExportEndpoint, java.lang.String, java.util.Map) */