Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public int compare(final Object o1, final Object o2) {
}
};

protected static String LOG_SEQ_FORMATTED_STRING;

protected final long _id;
protected String _name = null;
protected final ConcurrentHashMap<Long, Listener> _waitForList;
Expand Down Expand Up @@ -137,6 +139,7 @@ protected AgentAttache(final AgentManagerImpl agentMgr, final long id, final Str
_requests = new LinkedList<Request>();
_agentMgr = agentMgr;
_nextSequence = new Long(s_rand.nextInt(Short.MAX_VALUE)).longValue() << 48;
LOG_SEQ_FORMATTED_STRING = String.format("Seq %d-{}: {}", _id);
}

public synchronized long getNextSequence() {
Expand Down Expand Up @@ -197,9 +200,7 @@ protected void cancel(final Request req) {
}

protected synchronized void cancel(final long seq) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Cancelling."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Cancelling.");
final Listener listener = _waitForList.remove(seq);
if (listener != null) {
listener.processDisconnect(_id, Status.Disconnected);
Expand All @@ -218,24 +219,16 @@ protected synchronized int findRequest(final long seq) {
return Collections.binarySearch(_requests, seq, s_seqComparator);
}

protected String log(final long seq, final String msg) {
return "Seq " + _id + "-" + seq + ": " + msg;
}

protected void registerListener(final long seq, final Listener listener) {
if (logger.isTraceEnabled()) {
logger.trace(log(seq, "Registering listener"));
}
logger.trace(LOG_SEQ_FORMATTED_STRING, seq, "Registering listener");
if (listener.getTimeout() != -1) {
s_listenerExecutor.schedule(new Alarm(seq), listener.getTimeout(), TimeUnit.SECONDS);
}
_waitForList.put(seq, listener);
}

protected Listener unregisterListener(final long sequence) {
if (logger.isTraceEnabled()) {
logger.trace(log(sequence, "Unregistering listener"));
}
logger.trace(LOG_SEQ_FORMATTED_STRING, sequence, "Unregistering listener");
return _waitForList.remove(sequence);
}

Expand Down Expand Up @@ -267,7 +260,7 @@ public int getNonRecurringListenersSize() {
final Listener monitor = entry.getValue();
if (!monitor.isRecurring()) {
//TODO - remove this debug statement later
logger.debug("Listener is " + entry.getValue() + " waiting on " + entry.getKey());
logger.debug("Listener is {} waiting on {}", entry.getValue(), entry.getKey());
nonRecurringListenersList.add(monitor);
}
}
Expand All @@ -290,15 +283,10 @@ public boolean processAnswers(final long seq, final Response resp) {
if (answers[0] != null && answers[0].getResult()) {
processed = true;
}
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Unable to find listener."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Unable to find listener.");
} else {
processed = monitor.processAnswers(_id, seq, answers);
if (logger.isTraceEnabled()) {
logger.trace(log(seq, (processed ? "" : " did not ") + " processed "));
}

logger.trace(LOG_SEQ_FORMATTED_STRING, seq, (processed ? "" : " did not ") + " processed ");
if (!monitor.isRecurring()) {
unregisterListener(seq);
}
Expand All @@ -324,9 +312,7 @@ protected void cancelAllCommands(final Status state, final boolean cancelActive)
final Map.Entry<Long, Listener> entry = it.next();
it.remove();
final Listener monitor = entry.getValue();
if (logger.isDebugEnabled()) {
logger.debug(log(entry.getKey(), "Sending disconnect to " + monitor.getClass()));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, entry.getKey(), "Sending disconnect to " + monitor.getClass());
monitor.processDisconnect(_id, state);
}
}
Expand Down Expand Up @@ -357,9 +343,8 @@ public void send(final Request req, final Listener listener) throws AgentUnavail
long seq = req.getSequence();
if (listener != null) {
registerListener(seq, listener);
} else if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Routed from " + req.getManagementServerId()));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Routed from " + req.getManagementServerId());

synchronized (this) {
try {
Expand All @@ -381,16 +366,14 @@ public void send(final Request req, final Listener listener) throws AgentUnavail

if (req.executeInSequence() && _currentSequence == null) {
_currentSequence = seq;
if (logger.isTraceEnabled()) {
logger.trace(log(seq, " is current sequence"));
}
logger.trace(LOG_SEQ_FORMATTED_STRING, seq, " is current sequence");
}
} catch (AgentUnavailableException e) {
logger.info(log(seq, "Unable to send due to " + e.getMessage()));
logger.info(LOG_SEQ_FORMATTED_STRING, seq, "Unable to send due to " + e.getMessage());
cancel(seq);
throw e;
} catch (Exception e) {
logger.warn(log(seq, "Unable to send due to "), e);
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Unable to send due to " + e.getMessage(), e);
cancel(seq);
throw new AgentUnavailableException("Problem due to other exception " + e.getMessage(), _id);
}
Expand All @@ -409,50 +392,41 @@ public Answer[] send(final Request req, final int wait) throws AgentUnavailableE
try {
answers = sl.waitFor(wait);
} catch (final InterruptedException e) {
logger.debug(log(seq, "Interrupted"));
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Interrupted");
}
if (answers != null) {
if (logger.isDebugEnabled()) {
new Response(req, answers).logD("Received: ", false);
}
new Response(req, answers).logD("Received: ", false);
return answers;
}

answers = sl.getAnswers(); // Try it again.
if (answers != null) {
if (logger.isDebugEnabled()) {
new Response(req, answers).logD("Received after timeout: ", true);
}
new Response(req, answers).logD("Received after timeout: ", true);

_agentMgr.notifyAnswersToMonitors(_id, seq, answers);
return answers;
}

final Long current = _currentSequence;
if (current != null && seq != current) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Waited too long."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Waited too long.");

throw new OperationTimedoutException(req.getCommands(), _id, seq, wait, false);
}

if (logger.isDebugEnabled()) {
logger.debug(log(seq, "Waiting some more time because this is the current command"));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Waiting some more time because this is the current command");
}

throw new OperationTimedoutException(req.getCommands(), _id, seq, wait * 2, true);
} catch (OperationTimedoutException e) {
logger.warn(log(seq, "Timed out on " + req.toString()));
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Timed out on " + req.toString());
cancel(seq);
final Long current = _currentSequence;
if (req.executeInSequence() && (current != null && current == seq)) {
sendNext(seq);
}
throw e;
} catch (Exception e) {
logger.warn(log(seq, "Exception while waiting for answer"), e);
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Exception while waiting for answer", e);
cancel(seq);
final Long current = _currentSequence;
if (req.executeInSequence() && (current != null && current == seq)) {
Expand All @@ -467,22 +441,16 @@ public Answer[] send(final Request req, final int wait) throws AgentUnavailableE
protected synchronized void sendNext(final long seq) {
_currentSequence = null;
if (_requests.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug(log(seq, "No more commands found"));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "No more commands found");
return;
}

Request req = _requests.pop();
if (logger.isDebugEnabled()) {
logger.debug(log(req.getSequence(), "Sending now. is current sequence."));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, req.getSequence(), "Sending now. is current sequence.");
try {
send(req);
} catch (AgentUnavailableException e) {
if (logger.isDebugEnabled()) {
logger.debug(log(req.getSequence(), "Unable to send the next sequence"));
}
logger.debug(LOG_SEQ_FORMATTED_STRING, req.getSequence(), "Unable to send the next sequence");
cancel(req.getSequence());
}
_currentSequence = req.getSequence();
Expand Down
Loading