diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index da2ba03a2c97..d1b7e96c8a64 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -72,40 +72,43 @@ struct TEvPrivate { }; struct TEvUploadError : public TEventLocal { - - TEvUploadError(long httpCode, const TString& s3ErrorCode, const TString& message) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), S3ErrorCode(s3ErrorCode), Message(message) { - BuildIssues(); - } - - TEvUploadError(const TString& s3ErrorCode, const TString& message) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), S3ErrorCode(s3ErrorCode), Message(message) { - BuildIssues(); - } - - TEvUploadError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) - : StatusCode(statusCode), HttpCode(0), Message(message) { - BuildIssues(); - } - - TEvUploadError(long httpCode, const TString& message) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), Message(message) { - BuildIssues(); - } - - TEvUploadError(TIssues&& issues) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), Issues(issues) { - // don't build - } - - void BuildIssues() { - Issues = ::NYql::NDq::BuildIssues(HttpCode, S3ErrorCode, Message); + static constexpr size_t BODY_MAX_SIZE = 1_KB; + + TEvUploadError(NDqProto::StatusIds::StatusCode status, TIssues&& issues) + : Status(status) + , Issues(std::move(issues)) + {} + + TEvUploadError(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result) + : Status(NDqProto::StatusIds::INTERNAL_ERROR) + { + if (responseBody) { + if (const TS3Result s3Result(responseBody); s3Result.IsError) { + if (s3Result.Parsed) { + Status = StatusFromS3ErrorCode(s3Result.S3ErrorCode); + Issues.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode); + Issues.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage); + } else { + Issues.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage); + } + Issues = NS3Util::AddParentIssue("S3 issues", TIssues(Issues)); + } + } + Issues.AddIssues(NS3Util::AddParentIssue("Http geteway issues", TIssues(result.Issues))); + if (result.CurlResponseCode != CURLE_OK) { + Issues.AddIssue(TStringBuilder() << "CURL response code: " << curl_easy_strerror(result.CurlResponseCode)); + } + if (Status == NDqProto::StatusIds::INTERNAL_ERROR) { + Issues.AddIssues(NS3Util::AddParentIssue("Http request info", { + TIssue(TStringBuilder() << "Response code: " << result.Content.HttpResponseCode), + TIssue(TStringBuilder() << "Headers: " << result.Content.Headers), + TIssue(TStringBuilder() << "Body: \"" << TStringBuf(responseBody).Trunc(BODY_MAX_SIZE) << (responseBody.size() > BODY_MAX_SIZE ? "\"..." : "\"")) + })); + } + Issues = NS3Util::AddParentIssue(TStringBuilder() << message << ", s3 request id: [" << requestId << "]", TIssues(Issues)); } - NYql::NDqProto::StatusIds::StatusCode StatusCode; - long HttpCode; - TString S3ErrorCode; - TString Message; + NDqProto::StatusIds::StatusCode Status = NDqProto::StatusIds::UNSPECIFIED; TIssues Issues; }; @@ -179,7 +182,7 @@ class TS3FileWriteActor : public TActorBootstrapped { Gateway->Upload(Url + "?uploads", IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), "", - std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1), + std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, Url, RequestId, std::placeholders::_1), false, RetryPolicy); } @@ -264,71 +267,64 @@ class TS3FileWriteActor : public TActorBootstrapped { hFunc(TEvPrivate::TEvUploadFinished, Handle); ) - static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) { - if (!result.Issues) { - try { - TS3Result s3Result(std::move(result.Content.Extract())); - const auto& root = s3Result.GetRootNode(); - if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else if (root.Name() != "InitiateMultipartUploadResult") - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]"))); - else { - const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value()))); - } - } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what() << ", request id: [" << requestId << "]"))); - } - } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Upload error, request id: [" << requestId << "], ", std::move(result.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); - } - } - - static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& requestId, IHTTPGateway::TResult&& response) { - if (!response.Issues) { - const auto& str = response.Content.Headers; - const auto headerStr = str.substr(str.rfind("HTTP/")); - if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); - else { - TS3Result s3Result(std::move(response.Content.Extract())); - if (s3Result.IsError && s3Result.Parsed) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else { - constexpr size_t BODY_MAX_SIZE = 1_KB; - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, - TStringBuilder() << "Unexpected response" - << ". Headers: " << headerStr - << ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE) - << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"") - << ". Request id: [" << requestId << "]"))); - } + static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& url, const TString& requestId, IHTTPGateway::TResult&& result) { + const TString body = result.Content.Extract(); + if (result.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload response issues is not empty, url: " << url, requestId, body, result))); + return; + } + + try { + const TS3Result s3Result(body); + const auto& root = s3Result.GetRootNode(); + if (s3Result.IsError) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload operation failed, url: " << url, requestId, body, result))); + } else if (root.Name() != "InitiateMultipartUploadResult") { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", url: " << url, requestId, body, result))); + } else { + const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value()))); } + } catch (const std::exception& ex) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Failed to parse create upload response: " << ex.what() << ", url: " << url, requestId, body, result))); + } + } + + static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& url, const TString& requestId, IHTTPGateway::TResult&& response) { + const TString body = response.Content.Extract(); + if (response.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload finish response issues is not empty, url: " << url, requestId, body, response))); + return; + } + + const auto& str = response.Content.Headers; + const auto headerStr = str.substr(str.rfind("HTTP/")); + if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) { + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "PartUpload error, request id: [" << requestId << "], ", std::move(response.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload failed, url: " << url, requestId, body, response))); } } static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { - if (!result.Issues) { - try { - TS3Result s3Result(std::move(result.Content.Extract())); - const auto& root = s3Result.GetRootNode(); - if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else if (root.Name() != "CompleteMultipartUploadResult") - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]"))); - else - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what() << ", request id: [" << requestId << "]"))); + const TString body = result.Content.Extract(); + if (result.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload finish response issues is not empty, url: " << url, requestId, body, result))); + return; + } + + try { + const TS3Result s3Result(body); + const auto& root = s3Result.GetRootNode(); + if (s3Result.IsError) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload operation failed, url: " << url, requestId, body, result))); + } else if (root.Name() != "CompleteMultipartUploadResult") { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name() << ", url: " << url, requestId, body, result))); + } else { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } - } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Multipart error, request id: [" << requestId << "], ", std::move(result.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); + } catch (const std::exception& ex) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish multipart upload response: " << ex.what() << ", url: " << url, requestId, body, result))); } } @@ -341,23 +337,17 @@ class TS3FileWriteActor : public TActorBootstrapped { } static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { - if (!result.Issues) { - if (result.Content.HttpResponseCode >= 300) { - TString errorText = result.Content.Extract(); - TString errorCode; - TString message; - if (ParseS3ErrorResponse(errorText, errorCode, message)) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, errorCode, TStringBuilder{} << message << ", request id: [" << requestId << "]"))); - } else { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]"))); - } - } else { - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - } + const TString body = result.Content.Extract(); + if (result.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload finish response issues is not empty, url: " << url, requestId, body, result))); + return; + } + + if (result.Content.HttpResponseCode >= 300) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload operation failed, url: " << url, requestId, body, result))); } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "UploadFinish error, request id: [" << requestId << "], ", std::move(result.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } } @@ -387,7 +377,7 @@ class TS3FileWriteActor : public TActorBootstrapped { Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), std::move(part), - std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, RequestId, std::placeholders::_1), + std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, Url, RequestId, std::placeholders::_1), true, RetryPolicy); } @@ -444,7 +434,7 @@ class TS3FileWriteActor : public TActorBootstrapped { } void FailOnException() { - Send(ParentId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::BAD_REQUEST, CurrentExceptionMessage())); + Send(ParentId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << "Unexpected exception: " << CurrentExceptionMessage())})); SafeAbortMultipartUpload(); } @@ -614,18 +604,16 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput } void Handle(TEvPrivate::TEvUploadError::TPtr& result) { - LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString()); - - NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode; - if (statusCode == NDqProto::StatusIds::UNSPECIFIED) { - statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode); - if (statusCode == NDqProto::StatusIds::UNSPECIFIED) { - statusCode = NDqProto::StatusIds::INTERNAL_ERROR; - result->Get()->Issues.AddIssue("Got upload error with unspecified error code."); - } + auto status = result->Get()->Status; + auto issues = std::move(result->Get()->Issues); + LOG_W("TS3WriteActor", "TEvUploadError, status: " << NDqProto::StatusIds::StatusCode_Name(status) << ", issues: " << issues.ToOneLineString()); + + if (status == NDqProto::StatusIds::UNSPECIFIED) { + status = NDqProto::StatusIds::INTERNAL_ERROR; + issues.AddIssue("Got upload error with unspecified error code."); } - Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode); + Callbacks->OnAsyncOutputError(OutputIndex, issues, status); } void FinishIfNeeded() { diff --git a/ydb/library/yql/providers/s3/common/util.cpp b/ydb/library/yql/providers/s3/common/util.cpp index 54c99fbad3fe..8b0cb426983f 100644 --- a/ydb/library/yql/providers/s3/common/util.cpp +++ b/ydb/library/yql/providers/s3/common/util.cpp @@ -30,7 +30,7 @@ char* UrlEscape(char* to, const char* from) { } -TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) { +TIssues AddParentIssue(const TString& prefix, TIssues&& issues) { if (!issues) { return TIssues{}; } @@ -41,6 +41,10 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) { return TIssues{result}; } +TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) { + return AddParentIssue(TString(prefix), std::move(issues)); +} + TString UrlEscapeRet(const TStringBuf from) { TString to; to.ReserveAndResize(CgiEscapeBufLen(from.size())); diff --git a/ydb/library/yql/providers/s3/common/util.h b/ydb/library/yql/providers/s3/common/util.h index 00df60e48f89..baa2cbd6b20e 100644 --- a/ydb/library/yql/providers/s3/common/util.h +++ b/ydb/library/yql/providers/s3/common/util.h @@ -6,6 +6,7 @@ namespace NYql::NS3Util { +TIssues AddParentIssue(const TString& prefix, TIssues&& issues); TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues); // Like UrlEscape with forceEscape = true