Skip to content

Commit

Permalink
Improved (read: working) buffering of event packets in NEVFile.
Browse files Browse the repository at this point in the history
This no longer segfaults (yay) and is also much more efficient since only whole packets
(or continuations) are loaded from disk.
  • Loading branch information
mrkrause committed Jan 16, 2019
1 parent c0ef851 commit 4c12e00
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 63 deletions.
119 changes: 59 additions & 60 deletions NEVFile.cpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
#include "NEVFile.h"

#include "datapacket.h"

#include <iostream>
#include <cstring>
NEVFile::NEVFile(std::string filename, size_t buffersize) :
BUFFERSIZE(buffersize)
{

this->file.open(filename, std::ios_base::binary);
this->file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
if(!this->file) {
std::cerr << "Failed to open file " << filename << "|\n" << std::endl;
throw(std::runtime_error("Cannot open file for reading"));
}

auto nHeaders = readBasicHeader();
readExtendedHeaders(nHeaders);


buffer = new uint8_t[BUFFERSIZE];
file.read(reinterpret_cast<char*>(buffer), BUFFERSIZE);
buffer_capacity = file.gcount();
buffer_pos = 0;
this->file.open(filename, std::ios_base::binary);
this->file.exceptions(std::ifstream::failbit | std::ifstream::badbit);
if(!this->file) {
std::cerr << "Failed to open file " << filename << "|\n" << std::endl;
throw(std::runtime_error("Cannot open file for reading"));
}
auto nHeaders = readBasicHeader();
readExtendedHeaders(nHeaders);
buffer = new uint8_t[BUFFERSIZE*packetSize];
file.read(reinterpret_cast<char*>(buffer), BUFFERSIZE*packetSize);
buffer_capacity = file.gcount();
buffer_pos = 0;
}


Expand Down Expand Up @@ -90,7 +93,8 @@ void NEVFile::readExtendedHeaders(const std::uint32_t nHeaders) {
}

else if(std::equal(buffer, buffer+7, "NEUEVFLT")) {
/* Spike filters are easy--just copy the data over (depends on getting the alignment right!) */
/* Spike filters are easy--just copy the data over
(depends on getting the alignment right!) */
SpikeFilter sf;
std::copy(buffer+8, buffer+8+sizeof(SpikeFilter), reinterpret_cast<char*>(&sf));
this->spikeFilters.emplace(sf.electrodeID, sf);
Expand Down Expand Up @@ -135,17 +139,14 @@ bool NEVFile::eof() const {


void NEVFile::refillBuffer() {

if(!file.eof()) {
size_t remaining = BUFFERSIZE - buffer_pos;
memmove(buffer, buffer+buffer_pos, remaining);
try {
file.read(reinterpret_cast<char*>(buffer)+remaining, BUFFERSIZE - remaining);
file.read(reinterpret_cast<char*>(buffer), BUFFERSIZE*packetSize);
} catch (std::ifstream::failure &e) {
if(!file.eof())
throw(e);
}
buffer_capacity = remaining + file.gcount();
buffer_capacity = file.gcount();
buffer_pos = 0;
}
return;
Expand All @@ -171,12 +172,10 @@ std::shared_ptr<Packet> NEVFile::readPacketOrNull(bool digital, bool stim, bool
and the alloc/dealloc consumes a massive amount of runtime (>30% in the destructors alone).
*/



if(this->eof())
throw(std::runtime_error("Read past end of time"));

if((this->buffer_capacity - this->buffer_pos) < this->packetSize) {
if(this->buffer_capacity == this->buffer_pos) {
refillBuffer();
}

Expand All @@ -196,38 +195,41 @@ std::shared_ptr<Packet> NEVFile::readPacketOrNull(bool digital, bool stim, bool
p = parseCurrentAsStim();
}


if(!p) {
this->buffer_pos += this->packetSize;
}
//Peek at the next packet to see if it is a continuation packet
this->buffer_pos+=this->packetSize;

// Read continuation packets (not tested yet)
while(true) {
if(this->eof())
break;

if((this->buffer_capacity - this->buffer_pos) < this->packetSize) {
refillBuffer();
}

if(this->buffer_pos == this->buffer_capacity) {
if(this->eof())
return p;
else
refillBuffer();
}
start = buffer + buffer_pos;
std::copy(start, start+sizeof(timestamp), reinterpret_cast<char*>(&timestamp));

if(timestamp != 0xFFFFFFU)
std::copy(start, start+sizeof(timestamp),
reinterpret_cast<char*>(&timestamp));
if(timestamp != 0xFFFFFFU) //not a continuation packet
break;

if(!p) { // a continuation packet, but we're ignoring it (wrong type)
this->buffer_pos += this->packetSize;
continue;
} else {

auto wavep = std::dynamic_pointer_cast<WavePacket>(p);

auto wavep = std::dynamic_pointer_cast<WavePacket>(p);

auto newlen = wavep->len + this->packetSize - sizeof(timestamp);
char* newdata = new char[newlen];
auto newlen = wavep->len + this->packetSize - sizeof(timestamp);
char* newdata = new char[newlen];

std::copy(wavep->waveform, wavep->waveform + wavep->len, newdata);
std::copy(start, start + this->packetSize - sizeof(timestamp), newdata + wavep->len);

delete [] wavep->waveform;
wavep->waveform = newdata;
wavep->len = newlen;
buffer_pos += this->packetSize;
std::copy(wavep->waveform, wavep->waveform + wavep->len, newdata);
std::copy(start, start + this->packetSize - sizeof(timestamp),
newdata + wavep->len);

delete [] wavep->waveform;
wavep->waveform = newdata;
wavep->len = newlen;
buffer_pos += this->packetSize;
}
}

return p;
Expand All @@ -238,13 +240,16 @@ std::shared_ptr<DigitalPacket> NEVFile::parseCurrentAsDigital() {
auto start = buffer + buffer_pos;
std::shared_ptr<DigitalPacket> p(new DigitalPacket);

std::copy(start, start+sizeof(p->timestamp), reinterpret_cast<char*>(&(p->timestamp)));
start+=sizeof(p->timestamp) + 2; //The + 2 is because we're skipping the packetID
std::copy(start, start+sizeof(p->timestamp),
reinterpret_cast<char*>(&(p->timestamp)));

start+=sizeof(p->timestamp) + 2; //+ 2 to skip the packetID

DigitalReason reason;
std::copy(start, start+sizeof(p->reason), reinterpret_cast<char*>(&reason));
p->reason = reason;
start+=sizeof(p->reason) + 1; //Skipping a bit reserved for future use

start+=sizeof(p->reason) + 1; //Skipping a byte reserved for future use

std::copy(start, start+sizeof(p->parallel), reinterpret_cast<char*>(&(p->parallel)));
start+=sizeof(p->parallel);
Expand All @@ -261,8 +266,6 @@ std::shared_ptr<DigitalPacket> NEVFile::parseCurrentAsDigital() {
std::copy(start, start+sizeof(p->SMA4), reinterpret_cast<char*>(&(p->SMA4)));
start+=sizeof(p->SMA4);

buffer_pos+=this->packetSize;

return p;
}

Expand All @@ -283,9 +286,7 @@ std::shared_ptr<SpikePacket> NEVFile::parseCurrentAsSpike() {
p->waveform = new char[this->packetSize - 8]; //Now managed by SpikePacket
p->len = this->packetSize - 8;
std::copy(start, start+this->packetSize-8, p->waveform);


buffer_pos+=this->packetSize;

return p;
}
Expand All @@ -302,9 +303,7 @@ std::shared_ptr<StimPacket> NEVFile::parseCurrentAsStim() {

p->waveform = new char[this->packetSize - 8]; //Now managed by StimPacket
p->len = this->packetSize - 8;
std::copy(start, start+this->packetSize-8, p->waveform);

buffer_pos+=this->packetSize;
std::copy(start, start+this->packetSize-8, p->waveform);

return p;

Expand Down
9 changes: 7 additions & 2 deletions NEVFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ enum DigitalMode: std::uint8_t {

class NEVFile {
public:
NEVFile(std::string filename, size_t BUFFERSIZE=1000000);
NEVFile(std::string filename, size_t BUFFERSIZE=1000);
~NEVFile();

bool eof() const;
Expand All @@ -51,7 +51,12 @@ class NEVFile {

friend std::ostream& operator<<(std::ostream& out, const NEVFile& f);

auto timestampFS() const {return fsTimestamp; }
auto get_timestampFS() const {return fsTimestamp; }
auto get_waveformFS() const {return fsWaveforms; }
auto get_start_sys() const {return origin;}
auto get_comment() const {return comment;}
auto get_creator() const {return creator;}
auto get_spike_label(std::uint16_t id) {return labels[id];}
auto allWaves16Bit() const {return flags&1; }
protected:
std::ifstream file;
Expand Down
2 changes: 1 addition & 1 deletion datapacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct WavePacket : Packet {


struct SpikePacket : WavePacket {
std::uint8_t unit;
std::uint8_t unit;

SpikePacket();
SpikePacket(const SpikePacket& rhs);
Expand Down

0 comments on commit 4c12e00

Please sign in to comment.