Skip to content

Commit 85806ed

Browse files
mzientstiepan
authored andcommitted
Fix stream ordering in Tensor::Copy and Tensor(List)GPU.as_cpu
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
1 parent aa2d26f commit 85806ed

File tree

3 files changed

+35
-9
lines changed

3 files changed

+35
-9
lines changed

dali/pipeline/data/tensor.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,22 +113,29 @@ class Tensor : public Buffer<Backend> {
113113
inline void Copy(const Tensor<InBackend> &other, AccessOrder order = {}) {
114114
constexpr bool is_host_to_host = std::is_same<Backend, CPUBackend>::value &&
115115
std::is_same<InBackend, CPUBackend>::value;
116+
auto src_order = other.order();
117+
auto dst_order = order_;
116118
if (!order) {
117119
if (is_host_to_host)
118120
order = AccessOrder::host();
119-
else
120-
order = other.order() ? other.order() : order_;
121+
else // use device order, if available; if not, use whichever (dst, src) is set
122+
order = dst_order.is_device()
123+
? dst_order
124+
: src_order.is_device()
125+
? src_order
126+
: dst_order ? dst_order : src_order;
121127
}
122128
DALI_ENFORCE(!is_host_to_host || !order.is_device(),
123129
"Cannot issue a host-to-host copy on a device stream.");
124130
this->Resize(other.shape(), other.type());
125-
order.wait(order_);
131+
order.wait(dst_order); // wait for the destination to avoid overwriting while in use
132+
order.wait(other.order()); // wait for the source to avoid reading while not ready
126133
this->SetLayout(other.GetLayout());
127134
this->SetSourceInfo(other.GetSourceInfo());
128135
this->SetSkipSample(other.ShouldSkipSample());
129136
type_.template Copy<Backend, InBackend>(this->raw_mutable_data(),
130137
other.raw_data(), this->size(), order.stream());
131-
order_.wait(order);
138+
dst_order.wait(order);
132139
}
133140

134141
/**

dali/pipeline/data/tensor_list.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,17 @@ namespace copy_impl {
4545
*
4646
* The copy ordering can be:
4747
* - explict, as specified in `order`
48-
* - the one from `src_order`, if set
49-
* - the one from `dst_order`
48+
* - the one from `dst_order`, if set
49+
* - the one from `src_order`
5050
* @return copy_order - order on which we will do the copy
5151
*/
5252
AccessOrder SyncBefore(AccessOrder dst_order, AccessOrder src_order, AccessOrder order) {
5353
if (!order)
54-
order = src_order ? src_order : dst_order;
55-
54+
order = dst_order.is_device()
55+
? dst_order
56+
: src_order.is_device()
57+
? src_order
58+
: dst_order ? dst_order : src_order;
5659
// The destination buffer must be ready to be overwritten
5760
order.wait(dst_order);
5861
// The source buffer must be ready to cosume

dali/python/backend_impl.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,9 @@ void ExposeTensor(py::module &m) {
832832
},
833833
R"code(Passthrough, since the object is already an instance of `TensorCPU`.)code",
834834
py::return_value_policy::reference_internal)
835+
.def("_set_stream", [](Tensor<CPUBackend> &t, py::object stream) {
836+
t.set_order(AccessOrderFromPythonStreamObj(stream));
837+
})
835838
.def("_make_copy", [](const Tensor<CPUBackend> &t) {
836839
auto dst = std::make_unique<Tensor<CPUBackend>>();
837840
dst->set_device_id(t.device_id());
@@ -997,6 +1000,7 @@ void ExposeTensor(py::module &m) {
9971000
DeviceGuard g(t.device_id());
9981001
auto ret = std::make_unique<Tensor<CPUBackend>>();
9991002
ret->set_pinned(false);
1003+
ret->set_order(AccessOrder::host());
10001004
UserStream * us = UserStream::Get();
10011005
cudaStream_t s = us->GetStream(t);
10021006
ret->Copy(t, s);
@@ -1007,6 +1011,9 @@ void ExposeTensor(py::module &m) {
10071011
Returns a `TensorCPU` object being a copy of this `TensorGPU`.
10081012
)code",
10091013
py::return_value_policy::take_ownership)
1014+
.def("_set_stream", [](Tensor<GPUBackend> &t, py::object stream) {
1015+
t.set_order(AccessOrderFromPythonStreamObj(stream));
1016+
})
10101017
.def("_make_copy", [](const Tensor<GPUBackend> &t) {
10111018
DeviceGuard dg(t.device_id());
10121019
auto dst = std::make_unique<Tensor<GPUBackend>>();
@@ -1112,7 +1119,9 @@ std::unique_ptr<Tensor<Backend> > TensorListGetItemImpl(TensorList<Backend> &t,
11121119
auto ptr = std::make_unique<Tensor<Backend>>();
11131120
// TODO(klecki): Rework this with proper sample-based tensor batch data structure
11141121
auto &sample_shared_ptr = unsafe_sample_owner(t, id);
1115-
ptr->ShareData(sample_shared_ptr, t.capacity(), t.is_pinned(), t.shape()[id], t.type(),
1122+
auto &tshape = t.tensor_shape(id);
1123+
size_t num_bytes = tshape.num_elements() * t.type_info().size();
1124+
ptr->ShareData(sample_shared_ptr, num_bytes, t.is_pinned(), tshape, t.type(),
11161125
t.device_id(), t.order(), t.ready_event());
11171126
ptr->SetMeta(t.GetMeta(id));
11181127
return ptr;
@@ -1360,6 +1369,9 @@ void ExposeTensorListCPU(py::module &m) {
13601369
return t;
13611370
}, R"code(Passthrough, as it is already an instance of `TensorListCPU`.)code",
13621371
py::return_value_policy::reference_internal)
1372+
.def("_set_stream", [](TensorList<CPUBackend> &t, py::object stream) {
1373+
t.set_order(AccessOrderFromPythonStreamObj(stream));
1374+
})
13631375
.def("_make_copy", [](const TensorList<CPUBackend> &t) {
13641376
auto dst = std::make_shared<TensorList<CPUBackend>>();
13651377
dst->set_device_id(t.device_id());
@@ -1625,6 +1637,7 @@ void ExposeTesorListGPU(py::module &m) {
16251637
DeviceGuard g(t.device_id());
16261638
auto ret = std::make_shared<TensorList<CPUBackend>>();
16271639
ret->set_pinned(false);
1640+
ret->set_order(AccessOrder::host());
16281641
ret->SetContiguity(BatchContiguity::Contiguous);
16291642
UserStream * us = UserStream::Get();
16301643
cudaStream_t s = us->GetStream(t);
@@ -1636,6 +1649,9 @@ void ExposeTesorListGPU(py::module &m) {
16361649
Returns a `TensorListCPU` object being a copy of this `TensorListGPU`.
16371650
)code",
16381651
py::return_value_policy::take_ownership)
1652+
.def("_set_stream", [](TensorList<GPUBackend> &t, py::object stream) {
1653+
t.set_order(AccessOrderFromPythonStreamObj(stream));
1654+
})
16391655
.def("_make_copy", [](const TensorList<GPUBackend> &tl) {
16401656
DeviceGuard dg(tl.device_id());
16411657
auto dst = std::make_shared<TensorList<GPUBackend>>();

0 commit comments

Comments
 (0)