diff --git a/tensorpipe/common/ibv_lib.h b/tensorpipe/common/ibv_lib.h index 44f96e143..b51eb3646 100644 --- a/tensorpipe/common/ibv_lib.h +++ b/tensorpipe/common/ibv_lib.h @@ -41,7 +41,8 @@ namespace tensorpipe { _(query_gid, int, (IbvLib::context*, uint8_t, int, IbvLib::gid*)) \ _(query_port, int, (IbvLib::context*, uint8_t, IbvLib::port_attr*)) \ _(reg_mr, IbvLib::mr*, (IbvLib::pd*, void*, size_t, int)) \ - _(wc_status_str, const char*, (IbvLib::wc_status)) + _(wc_status_str, const char*, (IbvLib::wc_status)) \ + _(port_state_str, const char*, (IbvLib::port_state)) // Wrapper for libibverbs. diff --git a/tensorpipe/transport/ibv/reactor.cc b/tensorpipe/transport/ibv/reactor.cc index 2b399d8e5..9b25db558 100644 --- a/tensorpipe/transport/ibv/reactor.cc +++ b/tensorpipe/transport/ibv/reactor.cc @@ -17,8 +17,32 @@ namespace ibv { Reactor::Reactor(IbvLib ibvLib, IbvDeviceList deviceList) : ibvLib_(std::move(ibvLib)) { + bool found = false; TP_DCHECK_GE(deviceList.size(), 1); - ctx_ = createIbvContext(getIbvLib(), deviceList[0]); + + // If the deviceList contains multiple ibv devices, we will select the + // device of the port whose port_state is active, instead of just selecting + // the first device in the deviceList by default. + for (int i = 0; i < deviceList.size(); i++) { + IbvContext tp_ctx_; + IbvLib::port_attr portAttr; + std::memset(&portAttr, 0, sizeof(portAttr)); + tp_ctx_ = createIbvContext(getIbvLib(), deviceList[i]); + TP_CHECK_IBV_INT(ibvLib.query_port(tp_ctx_.get(), kPortNum, &portAttr)); + if (portAttr.state == IbvLib::port_state::PORT_ACTIVE) { + ctx_ = std::move(tp_ctx_); + found = true; + break; + } else { + TP_VLOG(8) << "IbvDevice " << deviceList[i].name << " port " + << unsigned(kPortNum) << " state is " + << ibvLib.port_state_str(portAttr.state) + << " , so skip this device"; + } + } + + TP_THROW_ASSERT_IF(found == false) << "Unable to find available ibv device"; + pd_ = createIbvProtectionDomain(getIbvLib(), ctx_); cq_ = createIbvCompletionQueue( getIbvLib(),