Skip to content

Commit f59a52d

Browse files
author
cypof
committed
Data queues, prefetching and multi-source
1 parent 2f6912c commit f59a52d

19 files changed

+691
-191
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
PROJECT := caffe
22

3-
CONFIG_FILE := Makefile.config
3+
CONFIG_FILE ?= Makefile.config
44
include $(CONFIG_FILE)
55

66
BUILD_DIR_LINK := $(BUILD_DIR)
@@ -268,6 +268,8 @@ endif
268268
# Debugging
269269
ifeq ($(DEBUG), 1)
270270
COMMON_FLAGS += -DDEBUG -g -O0
271+
# Compile issue in DEBUG on MAC (https://svn.boost.org/trac/boost/ticket/9392)
272+
COMMON_FLAGS += -DBOOST_NOINLINE='__attribute__ ((noinline))'
271273
NVCCFLAGS += -G
272274
else
273275
COMMON_FLAGS += -DNDEBUG -O2

include/caffe/common.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ class Caffe {
142142
// freed in a non-pinned way, which may cause problems - I haven't verified
143143
// it personally but better to note it here in the header file.
144144
inline static void set_mode(Brew mode) { Get().mode_ = mode; }
145-
// Sets the random seed of both boost and curand
145+
// Random seed of both boost and curand
146+
static unsigned int get_random_seed();
146147
static void set_random_seed(const unsigned int seed);
147148
// Sets the device. Since we have cublas and curand stuff, set device also
148149
// requires us to reset those values.
@@ -156,6 +157,7 @@ class Caffe {
156157
curandGenerator_t curand_generator_;
157158
#endif
158159
shared_ptr<RNG> random_generator_;
160+
unsigned int random_generator_seed_;
159161

160162
Brew mode_;
161163
static shared_ptr<Caffe> singleton_;

include/caffe/data_layers.hpp

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
#ifndef CAFFE_DATA_LAYERS_HPP_
22
#define CAFFE_DATA_LAYERS_HPP_
33

4+
#include <map>
45
#include <string>
56
#include <utility>
67
#include <vector>
78

8-
#include "boost/scoped_ptr.hpp"
9+
#include "boost/random/mersenne_twister.hpp"
10+
#include "boost/random/uniform_real.hpp"
11+
#include "boost/random/variate_generator.hpp"
12+
#include "boost/weak_ptr.hpp"
913
#include "hdf5.h"
1014

1115
#include "caffe/blob.hpp"
@@ -16,10 +20,16 @@
1620
#include "caffe/layer.hpp"
1721
#include "caffe/net.hpp"
1822
#include "caffe/proto/caffe.pb.h"
23+
#include "caffe/util/blocking_queue.hpp"
1924
#include "caffe/util/db.hpp"
2025

2126
namespace caffe {
2227

28+
using boost::weak_ptr;
29+
using boost::mt19937;
30+
using boost::uniform_real;
31+
using boost::variate_generator;
32+
2333
/**
2434
* @brief Provides base for data layers that feed blobs to the Net.
2535
*
@@ -52,12 +62,17 @@ class BaseDataLayer : public Layer<Dtype> {
5262
bool output_labels_;
5363
};
5464

65+
template <typename Dtype>
66+
class Batch {
67+
public:
68+
Blob<Dtype> data_, label_;
69+
};
70+
5571
template <typename Dtype>
5672
class BasePrefetchingDataLayer :
5773
public BaseDataLayer<Dtype>, public InternalThread {
5874
public:
59-
explicit BasePrefetchingDataLayer(const LayerParameter& param)
60-
: BaseDataLayer<Dtype>(param) {}
75+
explicit BasePrefetchingDataLayer(const LayerParameter& param);
6176
virtual ~BasePrefetchingDataLayer() {}
6277
// LayerSetUp: implements common data layer setup functionality, and calls
6378
// DataLayerSetUp to do special data layer setup for individual layer types.
@@ -70,22 +85,63 @@ class BasePrefetchingDataLayer :
7085
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
7186
const vector<Blob<Dtype>*>& top);
7287

73-
virtual void CreatePrefetchThread();
74-
virtual void JoinPrefetchThread();
75-
// The thread's function
76-
virtual void InternalThreadEntry() {}
88+
// Prefetches batches (asynchronously if to GPU memory)
89+
static const int PREFETCH_COUNT = 3;
7790

7891
protected:
79-
Blob<Dtype> prefetch_data_;
80-
Blob<Dtype> prefetch_label_;
92+
virtual void InternalThreadEntry();
93+
virtual void load_batch(Batch<Dtype>* batch) = 0;
94+
95+
Batch<Dtype> prefetch_[PREFETCH_COUNT];
96+
blocking_queue<Batch<Dtype>*> prefetch_free_;
97+
blocking_queue<Batch<Dtype>*> prefetch_full_;
98+
int device_;
99+
81100
Blob<Dtype> transformed_data_;
82101
};
83102

103+
// Prefetches datums to host memory that can be read by multiple data layers.
104+
class DataLoader {
105+
public:
106+
DataLoader(const DataParameter& param, int index);
107+
~DataLoader();
108+
109+
inline blocking_queue<Datum*>& free() {
110+
return body_.get()->free_;
111+
}
112+
inline blocking_queue<Datum*>& full() {
113+
return body_.get()->full_;
114+
}
115+
116+
protected:
117+
class Body: public InternalThread {
118+
public:
119+
Body(const DataParameter& param, int index);
120+
~Body();
121+
122+
void InternalThreadEntry();
123+
124+
shared_ptr<db::DB> db_;
125+
shared_ptr<db::Cursor> cursor_;
126+
127+
blocking_queue<Datum*> free_;
128+
blocking_queue<Datum*> full_;
129+
130+
DISABLE_COPY_AND_ASSIGN(Body);
131+
};
132+
133+
static map<string, weak_ptr<Body> > instances_;
134+
135+
const string source_;
136+
shared_ptr<Body> body_;
137+
138+
DISABLE_COPY_AND_ASSIGN(DataLoader);
139+
};
140+
84141
template <typename Dtype>
85-
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
142+
class DataLayer: public BasePrefetchingDataLayer<Dtype> {
86143
public:
87-
explicit DataLayer(const LayerParameter& param)
88-
: BasePrefetchingDataLayer<Dtype>(param) {}
144+
explicit DataLayer(const LayerParameter& param);
89145
virtual ~DataLayer();
90146
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
91147
const vector<Blob<Dtype>*>& top);
@@ -96,10 +152,12 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
96152
virtual inline int MaxTopBlobs() const { return 2; }
97153

98154
protected:
99-
virtual void InternalThreadEntry();
155+
virtual void load_batch(Batch<Dtype>* batch);
156+
DataLoader* next_loader();
100157

101-
shared_ptr<db::DB> db_;
102-
shared_ptr<db::Cursor> cursor_;
158+
vector<shared_ptr<DataLoader> > loaders_;
159+
mt19937 rand_engine_;
160+
uniform_real<float> rand_;
103161
};
104162

105163
/**
@@ -236,7 +294,7 @@ class ImageDataLayer : public BasePrefetchingDataLayer<Dtype> {
236294
protected:
237295
shared_ptr<Caffe::RNG> prefetch_rng_;
238296
virtual void ShuffleImages();
239-
virtual void InternalThreadEntry();
297+
virtual void load_batch(Batch<Dtype>* batch);
240298

241299
vector<std::pair<std::string, int> > lines_;
242300
int lines_id_;
@@ -308,7 +366,7 @@ class WindowDataLayer : public BasePrefetchingDataLayer<Dtype> {
308366

309367
protected:
310368
virtual unsigned int PrefetchRand();
311-
virtual void InternalThreadEntry();
369+
virtual void load_batch(Batch<Dtype>* batch);
312370

313371
shared_ptr<Caffe::RNG> prefetch_rng_;
314372
vector<std::pair<std::string, vector<int> > > image_database_;

include/caffe/internal_thread.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,28 @@ namespace caffe {
1818
*/
1919
class InternalThread {
2020
public:
21-
InternalThread() : thread_() {}
21+
InternalThread() : thread_(), must_stop_() {}
2222
virtual ~InternalThread();
2323

2424
/** Returns true if the thread was successfully started. **/
2525
bool StartInternalThread();
2626

2727
/** Will not return until the internal thread has exited. */
28-
bool WaitForInternalThreadToExit();
28+
bool StopInternalThread();
2929

3030
bool is_started() const;
3131

32+
bool must_stop() {
33+
return must_stop_;
34+
}
35+
3236
protected:
3337
/* Implement this method in your subclass
3438
with the code you want your thread to run. */
3539
virtual void InternalThreadEntry() {}
3640

3741
shared_ptr<boost::thread> thread_;
42+
bool must_stop_;
3843
};
3944

4045
} // namespace caffe

include/caffe/syncedmem.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ class SyncedMemory {
5656
SyncedHead head() { return head_; }
5757
size_t size() { return size_; }
5858

59+
#ifndef CPU_ONLY
60+
void async_gpu_push(const cudaStream_t& stream);
61+
#endif
62+
5963
private:
6064
void to_cpu();
6165
void to_gpu();
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#ifndef CAFFE_UTIL_BLOCKING_QUEUE_H_
2+
#define CAFFE_UTIL_BLOCKING_QUEUE_H_
3+
4+
#include <queue>
5+
#include <string>
6+
7+
#include "caffe/common.hpp"
8+
9+
namespace caffe {
10+
11+
template<typename T>
12+
class blocking_queue {
13+
public:
14+
explicit blocking_queue();
15+
virtual ~blocking_queue();
16+
17+
void push(const T& t);
18+
19+
bool empty() const;
20+
21+
bool try_pop(T* t);
22+
23+
T pop(const string& log_on_wait = "");
24+
25+
// Return element without removing it
26+
T peek();
27+
28+
inline uint64_t pops() {
29+
return pops_;
30+
}
31+
32+
protected:
33+
/**
34+
Move synchronization fields out instead of including boost/thread.hpp
35+
to avoid a boost/NVCC issues (#1009, #1010) on OSX. Also fails on
36+
Linux CUDA 7.0.18.
37+
*/
38+
class sync;
39+
40+
std::queue<T> queue_;
41+
shared_ptr<sync> sync_;
42+
time_t last_wait_log_;
43+
uint64_t pops_;
44+
45+
DISABLE_COPY_AND_ASSIGN(blocking_queue);
46+
};
47+
48+
} // namespace caffe
49+
50+
#endif

src/caffe/common.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,14 @@ Caffe::Caffe()
4646

4747
Caffe::~Caffe() { }
4848

49+
unsigned int Caffe::get_random_seed() {
50+
return Get().random_generator_seed_;
51+
}
52+
4953
void Caffe::set_random_seed(const unsigned int seed) {
5054
// RNG seed
5155
Get().random_generator_.reset(new RNG(seed));
56+
Get().random_generator_seed_ = seed;
5257
}
5358

5459
void Caffe::SetDevice(const int device_id) {
@@ -108,6 +113,10 @@ Caffe::~Caffe() {
108113
}
109114
}
110115

116+
unsigned int Caffe::get_random_seed() {
117+
return Get().random_generator_seed_;
118+
}
119+
111120
void Caffe::set_random_seed(const unsigned int seed) {
112121
// Curand seed
113122
static bool g_curand_availability_logged = false;
@@ -124,6 +133,7 @@ void Caffe::set_random_seed(const unsigned int seed) {
124133
}
125134
// RNG seed
126135
Get().random_generator_.reset(new RNG(seed));
136+
Get().random_generator_seed_ = seed;
127137
}
128138

129139
void Caffe::SetDevice(const int device_id) {

src/caffe/internal_thread.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace caffe {
55

66
InternalThread::~InternalThread() {
7-
WaitForInternalThreadToExit();
7+
StopInternalThread();
88
}
99

1010
bool InternalThread::is_started() const {
@@ -13,9 +13,10 @@ bool InternalThread::is_started() const {
1313

1414

1515
bool InternalThread::StartInternalThread() {
16-
if (!WaitForInternalThreadToExit()) {
16+
if (!StopInternalThread()) {
1717
return false;
1818
}
19+
must_stop_ = false;
1920
try {
2021
thread_.reset(
2122
new boost::thread(&InternalThread::InternalThreadEntry, this));
@@ -26,8 +27,10 @@ bool InternalThread::StartInternalThread() {
2627
}
2728

2829
/** Will not return until the internal thread has exited. */
29-
bool InternalThread::WaitForInternalThreadToExit() {
30+
bool InternalThread::StopInternalThread() {
31+
must_stop_ = true;
3032
if (is_started()) {
33+
thread_->interrupt();
3134
try {
3235
thread_->join();
3336
} catch (...) {

0 commit comments

Comments
 (0)