Skip to content

Commit

Permalink
Add message framing
Browse files Browse the repository at this point in the history
  • Loading branch information
rigtorp committed Nov 16, 2010
1 parent 9798fd4 commit 9fa8076
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ all: $(TARGETS)
$(TARGETS): include/nmq.hpp

clean:
rm -f *~
rm -f *~ core
rm -f $(TARGETS)
5 changes: 3 additions & 2 deletions TODO
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
* Monitoring tool
* Monitor message throughput
* Ping nodes to check if they are running
* Message framing, ie send message length
* Unit tests
* Unit tests
* Maybe allow zero length messages
* Magic and versioning of mmap file
52 changes: 28 additions & 24 deletions include/nmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class context_t {
unsigned int nodes;
unsigned int rings;
unsigned int size;
unsigned int msg_size;
size_t msg_size;
};

typedef volatile unsigned int vo_uint;
Expand All @@ -54,8 +54,8 @@ class context_t {
struct ring {

unsigned int _size;
unsigned int _msg_size;
unsigned int _offset;
size_t _msg_size;
size_t _offset;

// R/W access by the reader
// R/O access by the writer
Expand Down Expand Up @@ -97,7 +97,7 @@ class context_t {
header_->nodes = nodes;
header_->rings = n_rings;
header_->size = real_size - 1;
header_->msg_size = msg_size;
header_->msg_size = msg_size + sizeof(size_t);

for (unsigned int i = 0; i < header_->rings; i++) {
ring_[i]._size = real_size - 1;
Expand Down Expand Up @@ -133,7 +133,7 @@ class context_t {
}

void print() {
printf("nodes: %u, size: %u, msgsz: %u\n", header_->nodes, header_->size, header_->msg_size);
printf("nodes: %u, size: %u, msgsz: %lu\n", header_->nodes, header_->size, header_->msg_size);
for (unsigned int i = 0; i < header_->rings; i++) {
printf("%3i: %10u %10u\n", i, ring_[i]._head, ring_[i]._tail);
}
Expand Down Expand Up @@ -167,15 +167,16 @@ class context_t {
}

bool send(ring *ring, const void *msg, size_t size) {
assert(size <= ring->_msg_size);
assert(size <= (ring->_msg_size - sizeof(size_t)));

unsigned int h = (ring->_head - 1) & ring->_size;
unsigned int t = ring->_tail;
if (t == h)
return false;

char *d = &data_[ring_->_offset + t*ring->_msg_size];
memcpy(d, msg, size);
memcpy(d, &size, sizeof(size));
memcpy(d + sizeof(size), msg, size);

// Barrier is needed to make sure that item is updated
// before it's made available to the reader
Expand All @@ -196,14 +197,19 @@ class context_t {
return send(ring, msg, size);
}

bool recv(ring *ring, void *msg, size_t size) {
bool recv(ring *ring, void *msg, size_t *size) {
unsigned int t = ring->_tail;
unsigned int h = ring->_head;
if (h == t)
return false;

char *d = &data_[ring_->_offset + h*ring->_msg_size];

void *d = &data_[ring_->_offset + h*ring->_msg_size];
memcpy(msg, d, size);
size_t recv_size;
memcpy(&recv_size, d, sizeof(size_t));
assert(recv_size >= *size && "buffer too small");
*size = recv_size;
memcpy(msg, d + sizeof(size_t), recv_size);

// Barrier is needed to make sure that we finished reading the item
// before moving the head
Expand All @@ -213,37 +219,35 @@ class context_t {
return true;
}

bool recv(unsigned int from, unsigned int to, void *msg, size_t size) {
bool recv(unsigned int from, unsigned int to, void *msg, size_t *size) {
ring *ring = get_ring(from, to);
while (!recv(ring, msg, size)) bones::cpu::__relax();
return true;
}

bool recvnb(unsigned int from, unsigned int to, void *s, int size) {
bool recvnb(unsigned int from, unsigned int to, void *s, size_t *size) {
return recv(get_ring(from, to), s, size);
}

bool recv(unsigned int to, void *msg, size_t size) {
bool recv(unsigned int to, void *msg, size_t *size) {
// TODO "fair" receiving
while (true) {
for (unsigned int i = 0; i < header_->nodes; i++) {
if (to != i && recvnb(i, to, msg, size) != -1) return true;
if (to != i && recvnb(i, to, msg, size)) return true;
}
bones::cpu::__relax();
}
return false;
}

bool recvnb(unsigned int to, void *msg, size_t size) {
ssize_t recvnb(unsigned int to, void *msg, size_t *size) {
// TODO "fair" receiving
for (unsigned int i = 0; i < header_->nodes; i++) {
if (to != i && recvnb(i, to, msg, size) != -1) return true;
if (to != i && recvnb(i, to, msg, size)) return true;
}
return false;
}



std::string fname_;
void *p_;
header *header_;
Expand All @@ -267,27 +271,27 @@ class node_t {
//assert(node < context_->nodes());
}

bool send(unsigned int to, const void *msg, int size) {
bool send(unsigned int to, const void *msg, size_t size) {
return context_->send(node_, to, msg, size);
}

bool sendnb(unsigned int to, const void *msg, int size) {
bool sendnb(unsigned int to, const void *msg, size_t size) {
return context_->sendnb(node_, to, msg, size);
}

bool recv(unsigned int from, void *msg, int size) {
bool recv(unsigned int from, void *msg, size_t *size) {
return context_->recv(from, node_, msg, size);
}

bool recvnb(unsigned int from, void *msg, int size) {
bool recvnb(unsigned int from, void *msg, size_t *size) {
return context_->recvnb(from, node_, msg, size);
}

bool recv(void *msg, int size) {
bool recv(void *msg, size_t *size) {
return context_->recv(node_, msg, size);
}

bool recvnb(void *msg, int size) {
bool recvnb(void *msg, size_t *size) {
return context_->recvnb(node_, msg, size);
}

Expand Down
4 changes: 2 additions & 2 deletions local_lat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
return 1;
}
const char* queue = argv[1];
int message_size = atoi(argv [2]);
size_t message_size = atoi(argv [2]);
int roundtrip_count = atoi(argv [3]);

nmq::context_t context(queue);
Expand All @@ -49,7 +49,7 @@ int main(int argc, char* argv[]) {

nmq::node_t node(context, 0);
for (int i = 0; i != roundtrip_count; i++) {
node.recv(1, s, message_size);
node.recv(1, s, &message_size);
node.send(1, s, message_size);
}
}
4 changes: 2 additions & 2 deletions local_thr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
return 1;
}
const char* queue = argv[1];
int message_size = atoi(argv [2]);
size_t message_size = atoi(argv [2]);
int roundtrip_count = atoi(argv [3]);

nmq::context_t context(queue);
Expand All @@ -49,6 +49,6 @@ int main(int argc, char* argv[]) {

nmq::node_t node(context, 0);
for (int i = 0; i != roundtrip_count; i++) {
node.recv(1, s, message_size);
node.recv(1, s, &message_size);
}
}
4 changes: 2 additions & 2 deletions remote_lat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
return 1;
}
const char* queue = argv[1];
int message_size = atoi(argv [2]);
size_t message_size = atoi(argv [2]);
int roundtrip_count = atoi(argv [3]);

nmq::context_t context(queue);
Expand All @@ -52,7 +52,7 @@ int main(int argc, char* argv[]) {
gettimeofday(&start, NULL);
for (int i = 0; i != roundtrip_count; i++) {
node.send(0, s, message_size);
node.recv(0, s, message_size);
node.recv(0, s, &message_size);
}
gettimeofday(&stop, NULL);
long delta = (stop.tv_sec - start.tv_sec)*1000000 + (stop.tv_usec - start.tv_usec);
Expand Down
2 changes: 1 addition & 1 deletion remote_thr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) {
return 1;
}
const char* queue = argv[1];
int message_size = atoi(argv [2]);
size_t message_size = atoi(argv [2]);
long roundtrip_count = atoi(argv [3]);

nmq::context_t context(queue);
Expand Down

0 comments on commit 9fa8076

Please sign in to comment.