-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstoc-fairqueue.cpp
161 lines (132 loc) · 3.99 KB
/
stoc-fairqueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include "stoc-fairqueue.h"
#define TRACE_PKT 0 && 221759 //281594
using namespace std;
StocFairQueue::StocFairQueue(linkspeed_bps bitrate, mem_b maxsize,
QueueLogger *logger, uint32_t nQueue, uint32_t quantum)
: Queue(bitrate, maxsize, logger),
_nQueue(nQueue), _nPackets(0), _quantum(quantum)
{
// Create the list of FIFO queues.
_packets = vector<list<Packet*> >(_nQueue, list<Packet*>());
// Initialize state vectors.
_credits = vector<uint32_t>(_nQueue, 0);
_Qsize = vector<uint32_t>(_nQueue, 0);
_isActive = vector<bool>(_nQueue, false);
}
void
StocFairQueue::beginService()
{
if (_nPackets > 0) {
// We are guaranteed to have an active queue.
uint32_t queue;
while (true) {
queue = _activeQ.front();
if (_credits[queue] < _packets[queue].back()->size()) {
// Not enough credit, bump to back of queue.
_activeQ.pop_front();
_activeQ.push_back(queue);
_credits[queue] += _quantum;
} else {
break;
}
}
EventList::Get().sourceIsPendingRel(*this, drainTime(_packets[queue].back()));
}
}
void
StocFairQueue::completeService()
{
assert(_nPackets > 0);
uint32_t queue = _activeQ.front();
Packet *pkt = _packets[queue].back();
// Dequeue and book-keeping.
_packets[queue].pop_back();
_credits[queue] -= pkt->size();
_Qsize[queue] -= pkt->size();
_queuesize -= pkt->size();
_nPackets -= 1;
// If queue is empty, remove it from active list.
if (_packets[queue].empty()) {
_isActive[queue] = false;
_activeQ.pop_front();
_credits[queue] = 0;
}
pkt->flow().logTraffic(*pkt, *this, TrafficLogger::PKT_DEPART);
if (_logger) {
_logger->logQueue(*this, QueueLogger::PKT_SERVICE, *pkt);
}
applyEcnMark(*pkt);
pkt->sendOn();
beginService();
}
void
StocFairQueue::receivePacket(Packet &pkt)
{
if (TRACE_PKT == pkt.flow().id) {
cout << str() << " Pkt arrive " << timeAsMs(EventList::Get().now()) << " flowid " << pkt.flow().id << " " << pkt.id() << endl;
cout << str() << " Current qsize " << _queuesize << " with " << _nPackets << " pkts " << pkt.size() << endl;
}
// If there is no space in the buffer, return immediately.
if (_queuesize + pkt.size() > _maxsize) {
if (TRACE_PKT == pkt.flow().id) {
cout << str() << " DROP\n";
}
dropPacket(pkt);
return;
}
pkt.flow().logTraffic(pkt, *this, TrafficLogger::PKT_ARRIVE);
bool queueWasEmpty = (_nPackets == 0);
uint32_t queue = hashFlow(0, pkt.flow().id) % _nQueue;
// Enqueue it.
_packets[queue].push_front(&pkt);
_Qsize[queue] += pkt.size();
_queuesize += pkt.size();
_nPackets += 1;
// If FIFO queue wasn't active, make it active.
if (!_isActive[queue]) {
_isActive[queue] = true;
_activeQ.push_back(queue);
_credits[queue] = _quantum;
}
// If queue was empty, schedule next departure.
if (queueWasEmpty) {
assert(_nPackets == 1);
beginService();
}
}
void
StocFairQueue::dropPacket(Packet &pkt)
{
if (_logger) {
_logger->logQueue(*this, QueueLogger::PKT_DROP, pkt);
}
pkt.flow().logTraffic(pkt, *this, TrafficLogger::PKT_DROP);
pkt.free();
}
uint64_t
StocFairQueue::hashFlow(int index, uint32_t flowid)
{
/* A lame hash function impersonator, multiplies by a large prime number. */
uint64_t prime;
switch (index) {
case 0:
prime = 7643; // 970th prime number.
break;
case 1:
prime = 7723; // 980th prime number.
break;
case 2:
prime = 7829; // 990th prime number.
break;
case 3:
prime = 7919; // 1000th prime number.
break;
default:
prime = 1;
}
return prime * flowid;
}
void
StocFairQueue::printStats()
{
}