-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpriorityqueue.cpp
108 lines (87 loc) · 2.89 KB
/
priorityqueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include "priorityqueue.h"
#define TRACE_PKT 0 && 4304
using namespace std;
PriorityQueue::PriorityQueue(linkspeed_bps bitrate, mem_b maxsize, QueueLogger *logger)
: Queue(bitrate, maxsize, logger)
{
}
void
PriorityQueue::beginService()
{
if (!_packets.empty()) {
// Remove packet from the queue for transmit.
_currentPkt = *_packets.begin();
_packets.erase(_packets.begin());
// Schedule it's completion time.
EventList::Get().sourceIsPendingRel(*this, drainTime(_currentPkt));
if (TRACE_PKT == _currentPkt->flow().id) {
cout << str() << " Pkt depart sched " << EventList::Get().now() << " "
<< _currentPkt->id() << " " << _packets.size() << " " << drainTime(_currentPkt)
<< " " << _currentPkt->size() << " " << _ps_per_byte << endl;
}
}
}
void
PriorityQueue::completeService()
{
// Logging and cleanup.
_currentPkt->flow().logTraffic(*_currentPkt, *this, TrafficLogger::PKT_DEPART);
if (_logger) {
_logger->logQueue(*this, QueueLogger::PKT_SERVICE, *_currentPkt);
}
if (TRACE_PKT == _currentPkt->flow().id) {
cout << str() << " Pkt depart " << EventList::Get().now() << " " << _currentPkt->id()
<< " " << _currentPkt->size() << " " << _packets.size() << endl;
}
applyEcnMark(*_currentPkt);
_currentPkt->sendOn();
// Clear packet being transmitted.
_queuesize -= _currentPkt->size();
_currentPkt = NULL;
beginService();
}
void
PriorityQueue::receivePacket(Packet& pkt)
{
pkt.flow().logTraffic(pkt, *this, TrafficLogger::PKT_ARRIVE);
bool queueWasEmpty = (_currentPkt == NULL) && _packets.empty();
if (TRACE_PKT == pkt.flow().id) {
cout << str() << " Pkt arrive " << EventList::Get().now() << " " << pkt.id() << " " << pkt.size() << " " << _packets.size() << endl;
}
_packets.insert(&pkt);
_queuesize += pkt.size();
if (_logger) {
_logger->logQueue(*this, QueueLogger::PKT_ENQUEUE, pkt);
}
// If we are over the queue limit, drop packets from the end.
while (_queuesize > _maxsize) {
Packet *p = *prev(_packets.end());
_packets.erase(prev(_packets.end()));
_queuesize -= p->size();
if (_logger) {
_logger->logQueue(*this, QueueLogger::PKT_DROP, *p);
}
p->flow().logTraffic(*p, *this, TrafficLogger::PKT_DROP);
p->free();
}
if (queueWasEmpty) {
beginService();
}
}
void
PriorityQueue::printStats()
{
unordered_map<uint32_t, uint32_t> counts;
for (auto const& i : _packets) {
uint32_t fid = i->flow().id;
if (counts.find(fid) == counts.end()) {
counts[fid] = 0;
}
counts[fid] = counts[fid] + 1;
}
cout << str() << " stats ";
for (auto it = counts.begin(); it != counts.end(); it++) {
cout << " " << it->second;
}
cout << endl;
}