@@ -594,7 +594,8 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
594594 });
595595}
596596
597- bool RaftReplDev::wait_for_data_receive (std::vector< repl_req_ptr_t > const & rreqs, uint64_t timeout_ms) {
597+ bool RaftReplDev::wait_for_data_receive (std::vector< repl_req_ptr_t > const & rreqs, uint64_t timeout_ms,
598+ std::vector< repl_req_ptr_t >* timeout_rreqs) {
598599 std::vector< folly::Future< folly::Unit > > futs;
599600 std::vector< repl_req_ptr_t > only_wait_reqs;
600601 only_wait_reqs.reserve (rreqs.size ());
@@ -621,14 +622,23 @@ bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rre
621622
622623 // We are yet to support reactive fetch from remote.
623624 if (is_resync_mode ()) {
624- check_and_fetch_remote_data (std::move ( only_wait_reqs) );
625+ check_and_fetch_remote_data (only_wait_reqs);
625626 } else {
626- m_repl_svc.add_to_fetch_queue (shared_from_this (), std::move ( only_wait_reqs) );
627+ m_repl_svc.add_to_fetch_queue (shared_from_this (), only_wait_reqs);
627628 }
628629
629630 // block waiting here until all the futs are ready (data channel filled in and promises are made);
630- auto all_futs = folly::collectAllUnsafe (futs).wait (std::chrono::milliseconds (timeout_ms));
631- return (all_futs.isReady ());
631+ auto all_futs_ready = folly::collectAllUnsafe (futs).wait (std::chrono::milliseconds (timeout_ms)).isReady ();
632+ if (!all_futs_ready && timeout_rreqs != nullptr ) {
633+ timeout_rreqs->clear ();
634+ for (size_t i{0 }; i < futs.size (); ++i) {
635+ if (!futs[i].isReady ()) {
636+ timeout_rreqs->emplace_back (only_wait_reqs[i]);
637+ }
638+ }
639+ all_futs_ready = timeout_rreqs->empty ();
640+ }
641+ return all_futs_ready;
632642}
633643
634644void RaftReplDev::check_and_fetch_remote_data (std::vector< repl_req_ptr_t > rreqs) {
@@ -953,18 +963,22 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
953963
954964void RaftReplDev::handle_error (repl_req_ptr_t const & rreq, ReplServiceError err) {
955965 if (err == ReplServiceError::OK) { return ; }
966+ RD_LOGE (" Raft Channel: Error in processing rreq=[{}] error={}" , rreq->to_string (), err);
956967
957968 if (!rreq->add_state_if_not_already (repl_req_state_t ::ERRORED)) {
958- RD_LOGE (" Raft Channel: Error in processing rreq=[{}] error={}" , rreq->to_string (), err);
969+ RD_LOGE (" Raft Channel: Error has been added for rreq=[{}] error={}" , rreq->to_string (), err);
959970 return ;
960971 }
961972
962973 // Remove from the map and thus its no longer accessible from applier_create_req
963974 m_repl_key_req_map.erase (rreq->rkey ());
964975
965- if (rreq->op_code () == journal_type_t ::HS_DATA_INLINED) {
976+ // Ensure non-volatile lsn not exist because handle_error should not be called after append entries.
977+ HS_REL_ASSERT (m_state_machine->lsn_to_req (rreq->lsn ()) == nullptr ,
978+ " Unexpected: LSN={} is already ready to commit" , rreq->lsn ());
979+
980+ if (rreq->op_code () == journal_type_t ::HS_DATA_LINKED) {
966981 // Free the blks which is allocated already
967- RD_LOGE (" Raft Channel: Error in processing rreq=[{}] error={}" , rreq->to_string (), err);
968982 if (rreq->has_state (repl_req_state_t ::BLK_ALLOCATED)) {
969983 auto blkid = rreq->local_blkid ();
970984 data_service ().async_free_blk (blkid).thenValue ([blkid](auto && err) {
@@ -1276,8 +1290,9 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
12761290 }
12771291
12781292 // Wait till we receive the data from its originator for all the requests
1279- if (!wait_for_data_receive (*reqs, HS_DYNAMIC_CONFIG (consensus.data_receive_timeout_ms ))) {
1280- for (auto const & rreq : *reqs) {
1293+ std::vector< repl_req_ptr_t > timeout_rreqs;
1294+ if (!wait_for_data_receive (*reqs, HS_DYNAMIC_CONFIG (consensus.data_receive_timeout_ms ), &timeout_rreqs)) {
1295+ for (auto const & rreq : timeout_rreqs) {
12811296 handle_error (rreq, ReplServiceError::TIMEOUT);
12821297 }
12831298 ret = nuraft::cb_func::ReturnCode::ReturnNull;
@@ -1468,20 +1483,22 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
14681483 RD_DBG_ASSERT (happened, " rreq already exists for rkey={}" , rkey.to_string ());
14691484 uint32_t data_size{0u };
14701485
1486+ // If the data is linked and value_size is non-zero, it means blks have been allocated for data.
1487+ // Since the log is flushed after data is written, the data has already been received.
14711488 if ((jentry->code == journal_type_t ::HS_DATA_LINKED) && (jentry->value_size > 0 )) {
14721489 MultiBlkId entry_blkid;
14731490 entry_blkid.deserialize (entry_to_val (jentry), true /* copy */ );
14741491 data_size = entry_blkid.blk_count () * get_blk_size ();
14751492 rreq->set_local_blkid (entry_blkid);
1493+ rreq->add_state (repl_req_state_t ::BLK_ALLOCATED);
1494+ rreq->add_state (repl_req_state_t ::DATA_RECEIVED);
14761495 }
14771496
14781497 rreq->set_lsn (repl_lsn);
14791498 // keep lentry in scope for the lyfe cycle of the rreq
14801499 rreq->set_lentry (lentry);
14811500 rreq->init (rkey, jentry->code , false /* is_proposer */ , entry_to_hdr (jentry), entry_to_key (jentry), data_size);
14821501 // we load the log from log device, implies log flushed. We only flush log after data is written to data device.
1483- rreq->add_state (repl_req_state_t ::BLK_ALLOCATED);
1484- rreq->add_state (repl_req_state_t ::DATA_RECEIVED);
14851502 rreq->add_state (repl_req_state_t ::DATA_WRITTEN);
14861503 rreq->add_state (repl_req_state_t ::LOG_RECEIVED);
14871504 rreq->add_state (repl_req_state_t ::LOG_FLUSHED);
0 commit comments