Skip to content

Commit

Permalink
Improved exception management on server callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Rémi Rigal committed Aug 5, 2019
1 parent e2bead4 commit 714998b
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 69 deletions.
25 changes: 8 additions & 17 deletions examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

import random
import rospy
import actionlib
from actionlib_enhanced.msg import BasicComAction, BasicComGoal
from actionlib_msgs.msg import GoalStatus
from cv_bridge import CvBridge
from actionlib_enhanced import EnhancedActionClient

Expand All @@ -21,14 +21,10 @@ def __init__(self):

rospy.set_param('actionlib_client_sub_queue_size', 10)
self.goal = BasicComGoal()
# Simple Actionlib client
self.actionClient = actionlib.SimpleActionClient("/{}/basic_com".format(self.ns), BasicComAction)
if not self.actionClient.wait_for_server(timeout=rospy.Duration(10)):
rospy.signal_shutdown("Simple server not started")
# Custom Actionlib client
self.actionClientCustom = EnhancedActionClient("/{}/enhanced_com".format(self.ns), BasicComAction)
if not self.actionClientCustom.wait_for_server(timeout=rospy.Duration(10)):
rospy.signal_shutdown("Server custom not started")
rospy.signal_shutdown("Server not started")

rospy.loginfo("{} initialized".format(self.TAG))

Expand All @@ -40,13 +36,6 @@ def sendRequest(self, count):
:param count: number corresponding to the BasicComGoal message
"""
self.goal.numRequest = count
self.actionClient.send_goal(self.goal, done_cb=lambda x, y: self.doneCallback(x, y))

def sendRequest2(self, count):
"""
:param count: number corresponding to the BasicComGoal message
"""
self.goal.numRequest = count

# the ID returned from send_goal can be compared to the doneCallback 3rd arg
_ = self.actionClientCustom.send_goal(self.goal, done_cb=self.doneCallback)
Expand All @@ -57,14 +46,16 @@ def doneCallback(self, goalStatus, data, name=None):
:param data: BasicComResult message
:param name: ID of the goal, useful for multithreaded requests
"""
if goalStatus == 3: # SUCCESS
rospy.loginfo("received {}".format(data.numReceived))
if goalStatus == GoalStatus.SUCCEEDED:
rospy.loginfo("Received {}".format(data.numReceived))
elif goalStatus == GoalStatus.ABORTED:
rospy.logwarn("Server aborted goal")


if __name__ == "__main__":
actionClient = ActionClient()
for i in range(1, 21):
for i in range(1, 6):
rospy.loginfo("send {}".format(i))
actionClient.sendRequest2(i)
actionClient.sendRequest(i)
rospy.sleep(random.randint(1, 3))
rospy.spin()
26 changes: 8 additions & 18 deletions examples/client_multithreaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
from actionlib_enhanced.msg import BasicComAction, BasicComGoal
from actionlib_enhanced import EnhancedActionClient
from actionlib_msgs.msg import GoalStatus


class ActionClient(object):
Expand All @@ -31,30 +32,19 @@ def sendRequest(self, count):
:param count: number corresponding to the BasicComGoal message
"""
self.goal.numRequest = count

# the ID returned from send_goal can be compared to the doneCallback 3rd arg
_ = self.actionClientCustom.send_goal_and_wait(self.goal)
status = self.actionClientCustom.send_goal_and_wait(self.goal)
result = self.actionClientCustom.get_result()
rospy.loginfo("{}".format(result))

@staticmethod
def doneCallback(goalStatus, data, name=None):
"""
:param goalStatus: Status of the received goal
:param data: BasicComResult message
:param name: ID of the goal, useful for multithreaded requests
"""
if goalStatus == 3: # SUCCESS
rospy.loginfo("Waiting for something :)")
rospy.sleep(5)
rospy.loginfo("received {}".format(data.numReceived))
if status == GoalStatus.SUCCEEDED:
rospy.loginfo("Received: {}".format(result))
else:
rospy.logerr("Goal status: {}".format(status))


if __name__ == "__main__":
actionClient = ActionClient()

for i in range(1, 4):
rospy.loginfo("send {}".format(i))
for i in range(1, 7):
rospy.loginfo("Sending {}".format(i))
thread = threading.Thread(target=actionClient.sendRequest, args=(i,))
thread.setDaemon(True)
thread.start()
Expand Down
51 changes: 51 additions & 0 deletions examples/defective_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env python
# coding: utf-8

import rospy
import actionlib
import random
from actionlib_enhanced import EnhancedActionServer
from actionlib_enhanced.msg import BasicComAction, BasicComResult, BasicComGoal

class ActionServer():

TAG = "TestActionlibServer"

def __init__(self):
rospy.init_node("test_actionlib_server")
self.ns = rospy.get_param("~namespace", "test")
self.delay = rospy.Duration(5)
# Actionlib Enhanced
self.actionServerCustom = EnhancedActionServer("/{}/enhanced_com".format(self.ns),
BasicComAction,
self.onRequest,
auto_start=False)
self.actionServerCustom.start()

rospy.on_shutdown(self.onShutdown)
rospy.loginfo("{} initialized".format(self.TAG))

def onShutdown(self):
pass

def onRequest(self, goal): # multithreaded on requests
"""
sends back the goal it receives or raises an error
:type goal: BasicComGoal
:param goal: goal containing the BasicComGoal
"""
rospy.loginfo("=====================================================")
if goal.numRequest % 3 == 0:
rospy.loginfo("Received goal {} => Raising error".format(goal.numRequest))
raise Exception("Random error")
elif goal.numRequest % 3 == 2:
rospy.loginfo("Received goal {} => No call to succeeded".format(goal.numRequest))
elif goal.numRequest % 3 == 1:
rospy.loginfo("Received goal {} => Success".format(goal.numRequest))
result = BasicComResult(numReceived=goal.numRequest)
self.actionServerCustom.set_succeeded(result)


if __name__ == "__main__":
actionServer = ActionServer()
rospy.spin()
7 changes: 3 additions & 4 deletions src/actionlib_enhanced/action_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading
import rospy
from actionlib.action_client import ActionClient, CommState
from actionlib_msgs.msg import GoalStatus


class GoalState(object):
Expand Down Expand Up @@ -89,8 +90,7 @@ def get_state(self):
if L.get(cur_thread) is None:
rospy.logerr("Called get_state when this goal doesn't exist")
return False
status = self.ghDict[L[cur_thread]][1]
return status
return self.ghDict[L[cur_thread]][0].get_goal_status()

def get_result(self):
L = self.get_threads()
Expand Down Expand Up @@ -124,8 +124,7 @@ def _handle_transition(self, curGh):
done_cb = dicGh[2]
active_cb = dicGh[3]

error_msg = "Received comm state {} when in state {}".format(comm_state,
GoalState.name[oldStatus])
error_msg = "Received comm state {} when in state {}".format(comm_state, GoalState.name[oldStatus])

if comm_state == CommState.ACTIVE:
if oldStatus == GoalState.PENDING:
Expand Down
57 changes: 27 additions & 30 deletions src/actionlib_enhanced/action_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,48 +35,40 @@ def start(self):
"""
self.action_server.start()

def set_succeeded(self, result=None, text=""):
def _set_gh_state(self, result=None, text="", succeeded=True):
"""
:param result: An optional result to send back to any clients of the goal
:param text: An optionnal text associated to the SUCCESS status
:param text: An optionnal text associated to the SUCCESS|ABORTED status
:param succeeded: the goal handle is set to SUCCEEDED if true, else ABORTED
"""
if not self.ghDIct.has_key(threading.current_thread().ident):
threadId = threading.current_thread().ident
self.lock.acquire()
if not self.ghDict.has_key(threadId):
rospy.logwarn("set_succeeded called more than once in an actionlib server callback or outside one, ignoring call")
return
gh = self.ghDict[threading.current_thread().ident]
gh = self.ghDict[threadId]
if result is None:
result = self.get_default_result()
rate = rospy.Rate(1000)
while not self.is_elected(gh):
rate.sleep()
sender, _, _ = gh.get_goal_id().id.split("-")
self.lock.acquire()
self.electionList.get(sender).pop(0)
gh.set_succeeded(result, text)
self.ghDict.pop(threading.current_thread().ident)
self.lock.release()

def set_aborted(self, result=None, text=""):
"""
:param result: An optional result to send back to any clients of the goal
:param text: An optionnal text associated to the ABORTED status
"""
if not self.ghDIct.has_key(threading.current_thread().ident):
rospy.logwarn("set_aborted called more than once in an actionlib server callback or outside one, ignoring call")
return
gh = self.ghDict[threading.current_thread().ident]
if result is None:
result = self.get_default_result()
rate = rospy.Rate(1000)
while not self.is_elected(gh):
rate.sleep()
sender, _, _ = gh.get_goal_id().id.split("-")
self.lock.acquire()
self.electionList.get(sender).pop(0)
gh.set_aborted(result, text)
self.ghDict.pop(threading.current_thread().ident)
if succeeded:
gh.set_succeeded(result, text)
else:
gh.set_aborted(result, text)
self.lock.acquire()
self.ghDict.pop(threadId)
self.lock.release()

def set_succeeded(self, result=None, text=""):
self._set_gh_state(result, text)

def set_aborted(self, result=None, text=""):
self._set_gh_state(result, text, succeeded=False)

def get_default_result(self):
"""
:return: default content for result message
Expand All @@ -87,12 +79,18 @@ def execute_callback_on_elected(self, goal_handle):
rate = rospy.Rate(1000)
while not self.is_elected(goal_handle):
rate.sleep()
self.execute_server_callback(goal_handle)

def execute_server_callback(self, goal_handle):
self.lock.acquire()
self.ghDict[threading.current_thread().ident] = goal_handle
self.lock.release()
try:
self.execute_callback(goal_handle.get_goal())
except Exception as e:
rospy.logerr("Error in the actionlib server callback: {}".format(e))
finally:
if self.ghDIct.has_key(threading.current_thread().ident):
if self.ghDict.has_key(threading.current_thread().ident):
rospy.logwarn("The actionlib server callback did not set the goal as succeeded, sending unsuccessful result")
self.set_aborted()

Expand All @@ -111,10 +109,9 @@ def get_next_gh(self, goal_handle):
if self.call_on_elected:
t = threading.Thread(target=self.execute_callback_on_elected, args=(goal_handle,))
else:
t = threading.Thread(target=self.execute_callback, args=(goal_handle.get_goal(),))
t = threading.Thread(target=self.execute_server_callback, args=(goal_handle,))
t.setDaemon(True)
t.start()
self.ghDict[t.ident] = goal_handle
except threading.ThreadError:
rospy.logerr("Error: unable to start thread")
else:
Expand Down

0 comments on commit 714998b

Please sign in to comment.