Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ ifeq ($(CPU_ONLY), 1)
COMMON_FLAGS += -DCPU_ONLY
endif

# Benchmarks
ifeq ($(BENCHMARK_DATA), 1)
COMMON_FLAGS += -DBENCHMARK_DATA
endif
ifeq ($(BENCHMARK_SOLVER), 1)
COMMON_FLAGS += -DBENCHMARK_SOLVER
endif

# Python layer support
ifeq ($(WITH_PYTHON_LAYER), 1)
COMMON_FLAGS += -DWITH_PYTHON_LAYER
Expand Down
4 changes: 4 additions & 0 deletions Makefile.config.example
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ TEST_GPUID := 0

# enable pretty build (comment to see full commands)
Q ?= @

# Adds timing info in logs
# BENCHMARK_DATA := 1
# BENCHMARK_SOLVER := 1
1 change: 1 addition & 0 deletions include/caffe/caffe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "caffe/layer.hpp"
#include "caffe/layer_factory.hpp"
#include "caffe/net.hpp"
#include "caffe/parallel.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/solver.hpp"
#include "caffe/util/benchmark.hpp"
Expand Down
20 changes: 13 additions & 7 deletions include/caffe/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ void GlobalInit(int* pargc, char*** pargv);
class Caffe {
public:
~Caffe();
inline static Caffe& Get() {
if (!singleton_.get()) {
singleton_.reset(new Caffe());
}
return *singleton_;
}

// Thread local context for Caffe. Moved to common.cpp instead of
// including boost/thread.hpp to avoid a boost/NVCC issues (#1009, #1010)
// on OSX. Also fails on Linux with CUDA 7.0.18.
static Caffe& Get();

enum Brew { CPU, GPU };

// This random number generator facade hides boost and CUDA rng
Expand Down Expand Up @@ -149,6 +149,11 @@ class Caffe {
static void SetDevice(const int device_id);
// Prints the current GPU status.
static void DeviceQuery();
// Parallel training info
inline static int solver_count() { return Get().solver_count_; }
inline static void set_solver_count(int val) { Get().solver_count_ = val; }
inline static bool root_solver() { return Get().root_solver_; }
inline static void set_root_solver(bool val) { Get().root_solver_ = val; }

protected:
#ifndef CPU_ONLY
Expand All @@ -158,7 +163,8 @@ class Caffe {
shared_ptr<RNG> random_generator_;

Brew mode_;
static shared_ptr<Caffe> singleton_;
int solver_count_;
bool root_solver_;

private:
// The private constructor to avoid duplicate instantiation.
Expand Down
39 changes: 23 additions & 16 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
#include <utility>
#include <vector>

#include "boost/scoped_ptr.hpp"
#include "hdf5.h"

#include "caffe/blob.hpp"
#include "caffe/common.hpp"
#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/filler.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {
Expand Down Expand Up @@ -50,12 +51,17 @@ class BaseDataLayer : public Layer<Dtype> {
bool output_labels_;
};

template <typename Dtype>
class Batch {
public:
Blob<Dtype> data_, label_;
};

template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
public:
explicit BasePrefetchingDataLayer(const LayerParameter& param)
: BaseDataLayer<Dtype>(param) {}
explicit BasePrefetchingDataLayer(const LayerParameter& param);
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
// This method may not be overridden.
Expand All @@ -67,22 +73,24 @@ class BasePrefetchingDataLayer :
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);

virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
// The thread's function
virtual void InternalThreadEntry() {}
// Prefetches batches (asynchronously if to GPU memory)
static const int PREFETCH_COUNT = 3;

protected:
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch) = 0;

Batch<Dtype> prefetch_[PREFETCH_COUNT];
BlockingQueue<Batch<Dtype>*> prefetch_free_;
BlockingQueue<Batch<Dtype>*> prefetch_full_;

Blob<Dtype> transformed_data_;
};

template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
explicit DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param) {}
explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
Expand All @@ -93,10 +101,9 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
virtual inline int MaxTopBlobs() const { return 2; }

protected:
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<db::DB> db_;
shared_ptr<db::Cursor> cursor_;
DataReader reader_;
};

/**
Expand Down Expand Up @@ -235,7 +242,7 @@ class ImageDataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
shared_ptr<Caffe::RNG> prefetch_rng_;
virtual void ShuffleImages();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

vector<std::pair<std::string, int> > lines_;
int lines_id_;
Expand Down Expand Up @@ -307,7 +314,7 @@ class WindowDataLayer : public BasePrefetchingDataLayer<Dtype> {

protected:
virtual unsigned int PrefetchRand();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<Caffe::RNG> prefetch_rng_;
vector<std::pair<std::string, vector<int> > > image_database_;
Expand Down
82 changes: 82 additions & 0 deletions include/caffe/data_reader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef CAFFE_DATA_READER_HPP_
#define CAFFE_DATA_READER_HPP_

#include <map>
#include <string>
#include <vector>

#include "caffe/common.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {

/**
* @brief Reads data from a source to queues available to data layers.
* A single reading thread is created per source, even if multiple solvers
* are running in parallel, e.g. for multi-GPU training. This makes sure
* databases are read sequentially, and that each solver accesses a different
* subset of the database. Data is distributed to solvers in a round-robin
* way to keep parallel training deterministic.
*/
class DataReader {
public:
explicit DataReader(const LayerParameter& param);
~DataReader();

inline BlockingQueue<Datum*>& free() const {
return queue_pair_->free_;
}
inline BlockingQueue<Datum*>& full() const {
return queue_pair_->full_;
}

protected:
// Queue pairs are shared between a body and its readers
class QueuePair {
public:
explicit QueuePair(int size);
~QueuePair();

BlockingQueue<Datum*> free_;
BlockingQueue<Datum*> full_;

DISABLE_COPY_AND_ASSIGN(QueuePair);
};

// A single body is created per source
class Body : public InternalThread {
public:
explicit Body(const LayerParameter& param);
virtual ~Body();

protected:
void InternalThreadEntry();
void read_one(db::Cursor* cursor, QueuePair* qp);

const LayerParameter param_;
BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;

friend class DataReader;

DISABLE_COPY_AND_ASSIGN(Body);
};

// A source is uniquely identified by its layer name + path, in case
// the same database is read from two different locations in the net.
static inline string source_key(const LayerParameter& param) {
return param.name() + ":" + param.data_param().source();
}

const shared_ptr<QueuePair> queue_pair_;
shared_ptr<Body> body_;

static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;

DISABLE_COPY_AND_ASSIGN(DataReader);
};

} // namespace caffe

#endif // CAFFE_DATA_READER_HPP_
25 changes: 20 additions & 5 deletions include/caffe/internal_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ namespace caffe {
/**
* Virtual class encapsulate boost::thread for use in base class
* The child class will acquire the ability to run a single thread,
* by reimplementing the virutal function InternalThreadEntry.
* by reimplementing the virtual function InternalThreadEntry.
*/
class InternalThread {
public:
InternalThread() : thread_() {}
InternalThread();
virtual ~InternalThread();

/** Returns true if the thread was successfully started. **/
bool StartInternalThread();
/**
* Caffe's thread local state will be initialized using the current
* thread values, e.g. device id, solver index etc. The random seed
* is initialized using caffe_rng_rand.
*/
void StartInternalThread();

/** Will not return until the internal thread has exited. */
bool WaitForInternalThreadToExit();
void StopInternalThread();

bool is_started() const;

Expand All @@ -34,7 +38,18 @@ class InternalThread {
with the code you want your thread to run. */
virtual void InternalThreadEntry() {}

/* Should be tested when running loops to exit when requested. */
bool must_stop();

private:
void entry();

shared_ptr<boost::thread> thread_;
int device_;
Caffe::Brew mode_;
int rand_seed_;
int solver_count_;
bool root_solver_;
};

} // namespace caffe
Expand Down
4 changes: 3 additions & 1 deletion include/caffe/layer_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ class LayerRegistry {

// Get a layer using a LayerParameter.
static shared_ptr<Layer<Dtype> > CreateLayer(const LayerParameter& param) {
LOG(INFO) << "Creating layer " << param.name();
if (Caffe::root_solver()) {
LOG(INFO) << "Creating layer " << param.name();
}
const string& type = param.type();
CreatorRegistry& registry = Registry();
CHECK_EQ(registry.count(type), 1) << "Unknown layer type: " << type
Expand Down
Loading