From 9fa807680232ee69d894610ceafe542522336afe Mon Sep 17 00:00:00 2001 From: Erik Rigtorp Date: Mon, 15 Nov 2010 20:17:05 -0500 Subject: [PATCH] Add message framing --- Makefile | 2 +- TODO | 5 +++-- include/nmq.hpp | 52 ++++++++++++++++++++++++++----------------------- local_lat.cpp | 4 ++-- local_thr.cpp | 4 ++-- remote_lat.cpp | 4 ++-- remote_thr.cpp | 2 +- 7 files changed, 39 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index d363216..f61ced1 100644 --- a/Makefile +++ b/Makefile @@ -8,5 +8,5 @@ all: $(TARGETS) $(TARGETS): include/nmq.hpp clean: - rm -f *~ + rm -f *~ core rm -f $(TARGETS) \ No newline at end of file diff --git a/TODO b/TODO index 35a7579..635b4a4 100644 --- a/TODO +++ b/TODO @@ -2,5 +2,6 @@ * Monitoring tool * Monitor message throughput * Ping nodes to check if they are running -* Message framing, ie send message length -* Unit tests \ No newline at end of file +* Unit tests +* Maybe allow zero length messages +* Magic and versioning of mmap file diff --git a/include/nmq.hpp b/include/nmq.hpp index d17bb02..9975e77 100644 --- a/include/nmq.hpp +++ b/include/nmq.hpp @@ -45,7 +45,7 @@ class context_t { unsigned int nodes; unsigned int rings; unsigned int size; - unsigned int msg_size; + size_t msg_size; }; typedef volatile unsigned int vo_uint; @@ -54,8 +54,8 @@ class context_t { struct ring { unsigned int _size; - unsigned int _msg_size; - unsigned int _offset; + size_t _msg_size; + size_t _offset; // R/W access by the reader // R/O access by the writer @@ -97,7 +97,7 @@ class context_t { header_->nodes = nodes; header_->rings = n_rings; header_->size = real_size - 1; - header_->msg_size = msg_size; + header_->msg_size = msg_size + sizeof(size_t); for (unsigned int i = 0; i < header_->rings; i++) { ring_[i]._size = real_size - 1; @@ -133,7 +133,7 @@ class context_t { } void print() { - printf("nodes: %u, size: %u, msgsz: %u\n", header_->nodes, header_->size, header_->msg_size); + printf("nodes: %u, size: %u, msgsz: %lu\n", header_->nodes, header_->size, header_->msg_size); for (unsigned int i = 0; i < header_->rings; i++) { printf("%3i: %10u %10u\n", i, ring_[i]._head, ring_[i]._tail); } @@ -167,7 +167,7 @@ class context_t { } bool send(ring *ring, const void *msg, size_t size) { - assert(size <= ring->_msg_size); + assert(size <= (ring->_msg_size - sizeof(size_t))); unsigned int h = (ring->_head - 1) & ring->_size; unsigned int t = ring->_tail; @@ -175,7 +175,8 @@ class context_t { return false; char *d = &data_[ring_->_offset + t*ring->_msg_size]; - memcpy(d, msg, size); + memcpy(d, &size, sizeof(size)); + memcpy(d + sizeof(size), msg, size); // Barrier is needed to make sure that item is updated // before it's made available to the reader @@ -196,14 +197,19 @@ class context_t { return send(ring, msg, size); } - bool recv(ring *ring, void *msg, size_t size) { + bool recv(ring *ring, void *msg, size_t *size) { unsigned int t = ring->_tail; unsigned int h = ring->_head; if (h == t) return false; + + char *d = &data_[ring_->_offset + h*ring->_msg_size]; - void *d = &data_[ring_->_offset + h*ring->_msg_size]; - memcpy(msg, d, size); + size_t recv_size; + memcpy(&recv_size, d, sizeof(size_t)); + assert(recv_size >= *size && "buffer too small"); + *size = recv_size; + memcpy(msg, d + sizeof(size_t), recv_size); // Barrier is needed to make sure that we finished reading the item // before moving the head @@ -213,37 +219,35 @@ class context_t { return true; } - bool recv(unsigned int from, unsigned int to, void *msg, size_t size) { + bool recv(unsigned int from, unsigned int to, void *msg, size_t *size) { ring *ring = get_ring(from, to); while (!recv(ring, msg, size)) bones::cpu::__relax(); return true; } - bool recvnb(unsigned int from, unsigned int to, void *s, int size) { + bool recvnb(unsigned int from, unsigned int to, void *s, size_t *size) { return recv(get_ring(from, to), s, size); } - bool recv(unsigned int to, void *msg, size_t size) { + bool recv(unsigned int to, void *msg, size_t *size) { // TODO "fair" receiving while (true) { for (unsigned int i = 0; i < header_->nodes; i++) { - if (to != i && recvnb(i, to, msg, size) != -1) return true; + if (to != i && recvnb(i, to, msg, size)) return true; } bones::cpu::__relax(); } return false; } - bool recvnb(unsigned int to, void *msg, size_t size) { + ssize_t recvnb(unsigned int to, void *msg, size_t *size) { // TODO "fair" receiving for (unsigned int i = 0; i < header_->nodes; i++) { - if (to != i && recvnb(i, to, msg, size) != -1) return true; + if (to != i && recvnb(i, to, msg, size)) return true; } return false; } - - std::string fname_; void *p_; header *header_; @@ -267,27 +271,27 @@ class node_t { //assert(node < context_->nodes()); } - bool send(unsigned int to, const void *msg, int size) { + bool send(unsigned int to, const void *msg, size_t size) { return context_->send(node_, to, msg, size); } - bool sendnb(unsigned int to, const void *msg, int size) { + bool sendnb(unsigned int to, const void *msg, size_t size) { return context_->sendnb(node_, to, msg, size); } - bool recv(unsigned int from, void *msg, int size) { + bool recv(unsigned int from, void *msg, size_t *size) { return context_->recv(from, node_, msg, size); } - bool recvnb(unsigned int from, void *msg, int size) { + bool recvnb(unsigned int from, void *msg, size_t *size) { return context_->recvnb(from, node_, msg, size); } - bool recv(void *msg, int size) { + bool recv(void *msg, size_t *size) { return context_->recv(node_, msg, size); } - bool recvnb(void *msg, int size) { + bool recvnb(void *msg, size_t *size) { return context_->recvnb(node_, msg, size); } diff --git a/local_lat.cpp b/local_lat.cpp index 2840395..815693a 100644 --- a/local_lat.cpp +++ b/local_lat.cpp @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) { return 1; } const char* queue = argv[1]; - int message_size = atoi(argv [2]); + size_t message_size = atoi(argv [2]); int roundtrip_count = atoi(argv [3]); nmq::context_t context(queue); @@ -49,7 +49,7 @@ int main(int argc, char* argv[]) { nmq::node_t node(context, 0); for (int i = 0; i != roundtrip_count; i++) { - node.recv(1, s, message_size); + node.recv(1, s, &message_size); node.send(1, s, message_size); } } diff --git a/local_thr.cpp b/local_thr.cpp index 0405fe1..32f3228 100644 --- a/local_thr.cpp +++ b/local_thr.cpp @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) { return 1; } const char* queue = argv[1]; - int message_size = atoi(argv [2]); + size_t message_size = atoi(argv [2]); int roundtrip_count = atoi(argv [3]); nmq::context_t context(queue); @@ -49,6 +49,6 @@ int main(int argc, char* argv[]) { nmq::node_t node(context, 0); for (int i = 0; i != roundtrip_count; i++) { - node.recv(1, s, message_size); + node.recv(1, s, &message_size); } } diff --git a/remote_lat.cpp b/remote_lat.cpp index 7fc4499..d41e99e 100644 --- a/remote_lat.cpp +++ b/remote_lat.cpp @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) { return 1; } const char* queue = argv[1]; - int message_size = atoi(argv [2]); + size_t message_size = atoi(argv [2]); int roundtrip_count = atoi(argv [3]); nmq::context_t context(queue); @@ -52,7 +52,7 @@ int main(int argc, char* argv[]) { gettimeofday(&start, NULL); for (int i = 0; i != roundtrip_count; i++) { node.send(0, s, message_size); - node.recv(0, s, message_size); + node.recv(0, s, &message_size); } gettimeofday(&stop, NULL); long delta = (stop.tv_sec - start.tv_sec)*1000000 + (stop.tv_usec - start.tv_usec); diff --git a/remote_thr.cpp b/remote_thr.cpp index c18acc0..511db04 100644 --- a/remote_thr.cpp +++ b/remote_thr.cpp @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) { return 1; } const char* queue = argv[1]; - int message_size = atoi(argv [2]); + size_t message_size = atoi(argv [2]); long roundtrip_count = atoi(argv [3]); nmq::context_t context(queue);