Skip to content

Commit

Permalink
Refactor based on reviewer comments:
Browse files Browse the repository at this point in the history
* Remove ERROR status, as disconnecting and stale are sufficient.
* Refactor Connect/Disconnect logic into constructor/destructor.
  • Loading branch information
mjcarroll committed Jan 25, 2019
1 parent 85f5b52 commit 5041705
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 271 deletions.
7 changes: 2 additions & 5 deletions src/msgs/peer_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@ message PeerAnnounce
/// \brief Possible states announced by a peer
enum PeerState
{
/// \brief An error occurred
ERROR = 0;

/// \brief Peer is connecting right now
CONNECTING = 1;
CONNECTING = 0;

/// \brief Peer is disconnecting right now
DISCONNECTING = 2;
DISCONNECTING = 1;
};

/// \brief State which the peer is announcing
Expand Down
4 changes: 2 additions & 2 deletions src/network/PeerInfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ using namespace ignition;
using namespace gazebo;

/////////////////////////////////////////////////
PeerInfo::PeerInfo():
PeerInfo::PeerInfo(const NetworkRole &_role):
id(ignition::common::Uuid().String()),
role(NetworkRole::None)
role(_role)
{
}

Expand Down
3 changes: 1 addition & 2 deletions src/network/PeerInfo.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ namespace ignition
// Inline bracket to help doxygen filtering.
inline namespace IGNITION_GAZEBO_VERSION_NAMESPACE {
class IGNITION_GAZEBO_VISIBLE PeerInfo {
//
/// \brief Constructor
public: PeerInfo();
public: PeerInfo(const NetworkRole &_role = NetworkRole::None);

/// \brief Unique peer ID in the network
public: std::string id;
Expand Down
133 changes: 44 additions & 89 deletions src/network/PeerTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,47 @@ using namespace gazebo;

/////////////////////////////////////////////////
PeerTracker::PeerTracker(
const PeerInfo &_info,
EventManager *_eventMgr,
const ignition::transport::NodeOptions &_options):
node(_options),
eventMgr(_eventMgr)
info(_info),
eventMgr(_eventMgr),
node(_options)
{
this->heartbeatPub = this->node.Advertise<msgs::PeerInfo>("heartbeat");
this->announcePub = this->node.Advertise<msgs::PeerAnnounce>("announce");
this->node.Subscribe("heartbeat", &PeerTracker::OnPeerHeartbeat, this);
this->node.Subscribe("announce", &PeerTracker::OnPeerAnnounce, this);

msgs::PeerAnnounce msg;
*msg.mutable_info() = toProto(this->info);
msg.set_state(msgs::PeerAnnounce::CONNECTING);
this->announcePub.Publish(msg);

this->heartbeatRunning = true;
this->heartbeatThread = std::thread([this]()
{
this->HeartbeatLoop();
});
}

/////////////////////////////////////////////////
PeerTracker::~PeerTracker()
{
this->Disconnect();
this->node.Unsubscribe("heartbeat");
this->node.Unsubscribe("announce");

this->heartbeatRunning = false;
if (this->heartbeatThread.joinable())
{
this->heartbeatThread.join();
}

msgs::PeerAnnounce msg;
*msg.mutable_info() = toProto(this->info);
msg.set_state(msgs::PeerAnnounce::DISCONNECTING);

this->announcePub.Publish(msg);
}

/////////////////////////////////////////////////
Expand All @@ -58,51 +88,6 @@ size_t PeerTracker::StaleMultiplier() const
return this->staleMultiplier;
}

/////////////////////////////////////////////////
void PeerTracker::Connect(std::shared_ptr<PeerInfo> _info)
{
this->info = _info;

this->heartbeatPub = this->node.Advertise<msgs::PeerInfo>("heartbeat");
this->announcePub = this->node.Advertise<msgs::PeerAnnounce>("announce");
this->node.Subscribe("heartbeat", &PeerTracker::OnPeerHeartbeat, this);
this->node.Subscribe("announce", &PeerTracker::OnPeerAnnounce, this);

msgs::PeerAnnounce msg;
*msg.mutable_info() = toProto(*this->info);
msg.set_state(msgs::PeerAnnounce::CONNECTING);
this->announcePub.Publish(msg);

this->heartbeatRunning = true;
this->heartbeatThread = std::thread([this]()
{
this->HeartbeatLoop();
});
}

/////////////////////////////////////////////////
void PeerTracker::Disconnect()
{
this->node.Unsubscribe("heartbeat");
this->node.Unsubscribe("announce");

this->heartbeatRunning = false;
if (this->heartbeatThread.joinable())
{
this->heartbeatThread.join();
}

if (this->info)
{
msgs::PeerAnnounce msg;
*msg.mutable_info() = toProto(*this->info);
msg.set_state(msgs::PeerAnnounce::DISCONNECTING);

this->announcePub.Publish(msg);
this->info.reset();
}
}

/////////////////////////////////////////////////
size_t PeerTracker::NumPeers() const
{
Expand Down Expand Up @@ -135,7 +120,7 @@ void PeerTracker::HeartbeatLoop()
while (this->heartbeatRunning)
{
lastUpdateTime = Clock::now();
this->heartbeatPub.Publish(toProto(*this->info));
this->heartbeatPub.Publish(toProto(this->info));

std::vector<PeerInfo> toRemove;
for (auto peer : this->peers)
Expand Down Expand Up @@ -165,33 +150,16 @@ void PeerTracker::HeartbeatLoop()
}
}

/////////////////////////////////////////////////
void PeerTracker::AddPeer(const PeerInfo &_info)
{
auto lock = PeerLock(this->peersMutex);

auto peerState = PeerState();
peerState.info = _info;
peerState.lastSeen = std::chrono::steady_clock::now();
this->peers[_info.id] = peerState;
}

/////////////////////////////////////////////////
bool PeerTracker::RemovePeer(const PeerInfo &_info)
{
if (!this->info)
{
ignerr << "Internal error: peer missing info." << std::endl;
return false;
}

auto lock = PeerLock(this->peersMutex);

auto iter = this->peers.find(_info.id);
if (iter == this->peers.end())
{
igndbg << "Attempting to remove peer [" << _info.id << "] from ["
<< this->info->id << "] but it wasn't connected" << std::endl;
<< this->info.id << "] but it wasn't connected" << std::endl;
return false;
}

Expand All @@ -205,7 +173,7 @@ void PeerTracker::OnPeerAnnounce(const msgs::PeerAnnounce &_announce)
auto peer = fromProto(_announce.info());

// Skip announcements from self.
if (this->info && peer.id == this->info->id)
if (peer.id == this->info.id)
return;

switch (_announce.state())
Expand All @@ -216,9 +184,6 @@ void PeerTracker::OnPeerAnnounce(const msgs::PeerAnnounce &_announce)
case msgs::PeerAnnounce::DISCONNECTING:
this->OnPeerRemoved(peer);
break;
case msgs::PeerAnnounce::ERROR:
this->OnPeerError(peer);
break;
default:
break;
}
Expand All @@ -229,20 +194,16 @@ void PeerTracker::OnPeerHeartbeat(const msgs::PeerInfo &_info)
{
auto peer = fromProto(_info);

if (!this->info)
{
return;
}

// Skip hearbeats from self.
if (peer.id == this->info->id)
if (peer.id == this->info.id)
{
return;
}

auto lock = PeerLock(this->peersMutex);

// We may have missed a peer announce, add it on heartbeat.
// If it doesn't exist, we may have missed a peer announce,
// so add it here on the heartbeat.
if (this->peers.find(peer.id) == this->peers.end())
{
this->OnPeerAdded(peer);
Expand All @@ -256,20 +217,14 @@ void PeerTracker::OnPeerHeartbeat(const msgs::PeerInfo &_info)
std::chrono::nanoseconds(_info.header().stamp().nsec()));
}

/////////////////////////////////////////////////
void PeerTracker::OnPeerError(const PeerInfo &_info)
{
auto success = this->RemovePeer(_info);

// Emit event for any consumers
if (success && eventMgr)
eventMgr->Emit<PeerError>(_info);
}

/////////////////////////////////////////////////
void PeerTracker::OnPeerAdded(const PeerInfo &_info)
{
this->AddPeer(_info);
auto lock = PeerLock(this->peersMutex);
auto peerState = PeerState();
peerState.info = _info;
peerState.lastSeen = std::chrono::steady_clock::now();
this->peers[_info.id] = peerState;

// Emit event for any consumers
if (eventMgr)
Expand Down
62 changes: 29 additions & 33 deletions src/network/PeerTracker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ namespace ignition
// Inline bracket to help doxygen filtering.
inline namespace IGNITION_GAZEBO_VERSION_NAMESPACE {
// Network Events
/// \brief PeerError fired when there is an unexpected error in a peer
using PeerError = common::EventT<void(PeerInfo), struct PeerErrorTag>;

/// \brief PeerAdded fired when a peer announces itself or detected via
/// heartbeat
using PeerAdded = common::EventT<void(PeerInfo), struct PeerAddedTag>;
Expand All @@ -61,9 +58,15 @@ namespace ignition
public: using Duration = std::chrono::steady_clock::duration;

/// \brief Constructor
///
/// Announce the existence of a peer with given information _info,
/// and start executing heartbeats and peer tracking.
///
/// \param[in] _eventMgr Event Manager to emit network events on.
/// \param[in] _options Advanced options for underlying ign-transport
/// \param[in] _info Peer information to announce
public: explicit PeerTracker(
const PeerInfo &_info,
EventManager *_eventMgr = nullptr,
const NodeOptions &_options = NodeOptions());

Expand Down Expand Up @@ -93,33 +96,28 @@ namespace ignition
/// \return Number of hearbeats before a peer is marked stale.
public: size_t StaleMultiplier() const;

/// \brief Retrieve total number of detected peers in the network.
public: size_t NumPeers() const;

/// \brief Retrieve number of detected peers in the network by role.
/// \param[in] _role Role of peers to enumerate
/// \return Number of peers with the given role.
public: size_t NumPeers(const NetworkRole &_role) const;

/// \brief Connect to the network graph.
///
/// Announce the existence of a peer with given information _info,
/// and start executing heartbeats and peer tracking.
/// \param[in] _info Peer information to announce
public: void Connect(std::shared_ptr<PeerInfo> _info);
private: void Connect();

/// \brief Disconnect from the network graph.
///
/// Also announce that this peer is leaving the network.
public: void Disconnect();

/// \brief Retrieve number of detected peers in the network.
public: size_t NumPeers() const;

/// \brief Retrieve number of detected peers in the network by role.
/// \param[in] _role Role of peers to enumerate
/// \return Number of peers with the given role.
public: size_t NumPeers(const NetworkRole &_role) const;
private: void Disconnect();

/// \brief Internal loop to announce and check stale peers.
private: void HeartbeatLoop();

/// \brief Helper function for adding a peer
/// \param[in] _info Peer to add
private: void AddPeer(const PeerInfo &_info);

/// \brief Helper function for removing a peer
/// \param[in] _info Peer to remove
/// \return True if successfully removed.
Expand All @@ -133,10 +131,6 @@ namespace ignition
/// \param[in] _info Heartbeat from another peer.
private: void OnPeerHeartbeat(const msgs::PeerInfo &_info);

/// \brief Callback for when peer errors are detected.
/// \param[in] _info Info from peer which had an error.
private: void OnPeerError(const PeerInfo &_info);

/// \brief Callback for when a peer is added.
/// \param[in] _info Info from peer which was added.
private: void OnPeerAdded(const PeerInfo &_info);
Expand All @@ -149,15 +143,6 @@ namespace ignition
/// \param[in] _info Info from peer which is stale.
private: void OnPeerStale(const PeerInfo &_info);

/// \brief Transport node
private: ignition::transport::Node node;

/// \brief Heartbeat publisher
private: ignition::transport::Node::Publisher heartbeatPub;

/// \brief Announcement publisher
private: ignition::transport::Node::Publisher announcePub;

/// \brief Information about discovered peers
struct PeerState
{
Expand Down Expand Up @@ -196,11 +181,22 @@ namespace ignition
/// \brief Timeout to mark a peer as stale.
private: size_t staleMultiplier {5};

/// \brief Peer information that this tracker announces.
private: PeerInfo info;

/// \brief Event manager instance to be used to emit network events.
private: EventManager *eventMgr;

/// \brief Peer information that this tracker announces.
private: std::shared_ptr<PeerInfo> info;
/// \brief Transport node
private: ignition::transport::Node node;

/// \brief Heartbeat publisher
private: ignition::transport::Node::Publisher heartbeatPub;

/// \brief Announcement publisher
private: ignition::transport::Node::Publisher announcePub;


};
}
} // namespace gazebo
Expand Down
Loading

0 comments on commit 5041705

Please sign in to comment.