From e530b7ad111520c66258e8229d3699f1cfe8d4b1 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Thu, 6 Mar 2025 13:32:03 +0000 Subject: [PATCH 1/8] Added YDB_AUTH_TICKET_HEADER --- .../ydb/control_plane/cms_grpc_client_actor.cpp | 6 ++++-- .../monitoring_grpc_client_actor.cpp | 4 +++- .../fq/libs/compute/ydb/control_plane/ya.make | 1 + .../compute/ydb/control_plane/ydb_grpc_helpers.h | 16 ++++++++++++++++ 4 files changed, 24 insertions(+), 3 deletions(-) create mode 100644 ydb/core/fq/libs/compute/ydb/control_plane/ydb_grpc_helpers.h diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp index 0f19e61a467a..970f88fe281a 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp @@ -1,3 +1,5 @@ +#include "ydb_grpc_helpers.h" + #include #include @@ -78,7 +80,7 @@ class TCmsGrpcServiceActor : public NActors::TActor, NGrpc auto forwardRequest = std::make_unique(); forwardRequest->Request.mutable_serverless_resources()->set_shared_database_path(request.BasePath); forwardRequest->Request.set_path(request.Path); - forwardRequest->Token = CredentialsProvider->GetAuthInfo(); + SetYdbRequestToken(*forwardRequest, CredentialsProvider->GetAuthInfo()); TEvPrivate::TEvCreateDatabaseRequest::TPtr forwardEvent = (NActors::TEventHandle*)new IEventHandle(SelfId(), SelfId(), forwardRequest.release(), 0, Cookie); MakeCall(std::move(forwardEvent)); Requests[Cookie++] = ev; @@ -119,7 +121,7 @@ class TCmsGrpcServiceActor : public NActors::TActor, NGrpc void Handle(TEvYdbCompute::TEvListDatabasesRequest::TPtr& ev) { auto forwardRequest = std::make_unique(); - forwardRequest->Token = CredentialsProvider->GetAuthInfo(); + SetYdbRequestToken(*forwardRequest, CredentialsProvider->GetAuthInfo()); TEvPrivate::TEvListDatabasesRequest::TPtr forwardEvent = (NActors::TEventHandle*)new IEventHandle(SelfId(), SelfId(), forwardRequest.release(), 0, Cookie); MakeCall(std::move(forwardEvent)); Requests[Cookie++] = ev; diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp index 413749cbc6c5..a8a29ee71510 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/monitoring_grpc_client_actor.cpp @@ -1,3 +1,5 @@ +#include "ydb_grpc_helpers.h" + #include #include @@ -64,7 +66,7 @@ class TMonitoringGrpcServiceActor : public NActors::TActor(); forwardRequest->Request.set_return_verbose_status(true); - forwardRequest->Token = CredentialsProvider->GetAuthInfo(); + SetYdbRequestToken(*forwardRequest, CredentialsProvider->GetAuthInfo()); TEvPrivate::TEvSelfCheckRequest::TPtr forwardEvent = (NActors::TEventHandle*)new IEventHandle(SelfId(), SelfId(), forwardRequest.release(), 0, Cookie); MakeCall(std::move(forwardEvent)); Requests[Cookie++] = ev; diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make index 66e3cc3f7a54..1c1019d32479 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/ya.make +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ya.make @@ -24,6 +24,7 @@ PEERDIR( ydb/library/yql/utils/actors ydb/public/api/grpc ydb/public/api/grpc/draft + ydb/public/sdk/cpp/src/client/resources ydb/public/sdk/cpp/src/library/operation_id/protos yql/essentials/public/issue yql/essentials/utils diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/ydb_grpc_helpers.h b/ydb/core/fq/libs/compute/ydb/control_plane/ydb_grpc_helpers.h new file mode 100644 index 000000000000..9b34f7e07131 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/control_plane/ydb_grpc_helpers.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +namespace NFq { + +template +void SetYdbRequestToken(NCloud::TEvGrpcProtoRequest& event, const TString& token) { + if (token) { + event.Token = token; + event.Headers.emplace(NYdb::YDB_AUTH_TICKET_HEADER, token); + } +} + +} // namespace NFq From dd2b45252fcf22e1cc63e060aa7632f1605079c6 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 19 Mar 2025 08:13:26 +0000 Subject: [PATCH 2/8] TMP --- .../s3/actors/yql_s3_write_actor.cpp | 216 +++++++++--------- ydb/library/yql/providers/s3/common/util.cpp | 6 +- ydb/library/yql/providers/s3/common/util.h | 1 + 3 files changed, 113 insertions(+), 110 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index da2ba03a2c97..4fb507691e45 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -72,40 +72,40 @@ struct TEvPrivate { }; struct TEvUploadError : public TEventLocal { + static constexpr size_t BODY_MAX_SIZE = 1_KB; + + TEvUploadError(NDqProto::StatusIds::StatusCode status, TIssues&& issues) + : Status(status) + , Issues(std::move(issues)) + {} + + static TEvUploadError* InternalError(const TString& message, const TString& requestId, IHTTPGateway::TResult&& result) { + TIssues issues; + issues.AddIssues(NS3Util::AddParentIssue("Http geteway issues", std::move(result.Issues))); + issues.AddIssue(TStringBuilder() << "CURL response code: " << curl_easy_strerror(result.CurlResponseCode)); + + TS3Result s3Result(std::move(result.Content.Extract())); + if (s3Result.IsError) { + TIssues issuesS3; + if (s3Result.Parsed) { + issuesS3.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode); + issuesS3.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage); + } else { + issuesS3.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage); + } + issues.AddIssues(NS3Util::AddParentIssue("S3 issues", std::move(issuesS3))); + } - TEvUploadError(long httpCode, const TString& s3ErrorCode, const TString& message) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), S3ErrorCode(s3ErrorCode), Message(message) { - BuildIssues(); - } - - TEvUploadError(const TString& s3ErrorCode, const TString& message) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), S3ErrorCode(s3ErrorCode), Message(message) { - BuildIssues(); - } - - TEvUploadError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) - : StatusCode(statusCode), HttpCode(0), Message(message) { - BuildIssues(); - } - - TEvUploadError(long httpCode, const TString& message) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), Message(message) { - BuildIssues(); - } - - TEvUploadError(TIssues&& issues) - : StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), Issues(issues) { - // don't build - } + issues.AddIssue(NS3Util::AddParentIssue("Http request info", { + TIssue(TStringBuilder() << "Response code: " << result.Content.HttpResponseCode), + TIssue(TStringBuilder() << "Headers: " << result.Content.Headers), + TIssue(TStringBuilder() << "Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE) << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"")) + })); - void BuildIssues() { - Issues = ::NYql::NDq::BuildIssues(HttpCode, S3ErrorCode, Message); + return new TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, NS3Util::AddParentIssue(TStringBuilder() << message << ", s3 request id: [" << requestId << "]", std::move(issues))); } - NYql::NDqProto::StatusIds::StatusCode StatusCode; - long HttpCode; - TString S3ErrorCode; - TString Message; + NDqProto::StatusIds::StatusCode Status = NDqProto::StatusIds::UNSPECIFIED; TIssues Issues; }; @@ -265,70 +265,70 @@ class TS3FileWriteActor : public TActorBootstrapped { ) static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) { - if (!result.Issues) { - try { - TS3Result s3Result(std::move(result.Content.Extract())); - const auto& root = s3Result.GetRootNode(); - if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else if (root.Name() != "InitiateMultipartUploadResult") - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]"))); - else { - const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value()))); - } - } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what() << ", request id: [" << requestId << "]"))); + if (result.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnUploadsCreated error, response issues is not empty", requestId, std::move(result)))); + return; + } + + try { + TS3Result s3Result(std::move(result.Content.Extract())); + const auto& root = s3Result.GetRootNode(); + if (s3Result.IsError) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); + } else if (root.Name() != "InitiateMultipartUploadResult") { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]"))); + } else { + const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value()))); } - } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Upload error, request id: [" << requestId << "], ", std::move(result.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); + } catch (const std::exception& ex) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what() << ", request id: [" << requestId << "]"))); } } static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& requestId, IHTTPGateway::TResult&& response) { - if (!response.Issues) { - const auto& str = response.Content.Headers; - const auto headerStr = str.substr(str.rfind("HTTP/")); - if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); - else { - TS3Result s3Result(std::move(response.Content.Extract())); - if (s3Result.IsError && s3Result.Parsed) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else { - constexpr size_t BODY_MAX_SIZE = 1_KB; - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, - TStringBuilder() << "Unexpected response" - << ". Headers: " << headerStr - << ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE) - << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"") - << ". Request id: [" << requestId << "]"))); - } - } + if (response.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnPartUploadFinish error, response issues is not empty", requestId, std::move(response)))); + return; + } + + const auto& str = response.Content.Headers; + const auto headerStr = str.substr(str.rfind("HTTP/")); + if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) { + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "PartUpload error, request id: [" << requestId << "], ", std::move(response.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); + TS3Result s3Result(std::move(response.Content.Extract())); + if (s3Result.IsError && s3Result.Parsed) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); + } else { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, + TStringBuilder() << "Unexpected response" + << ". Headers: " << headerStr + << ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE) + << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"") + << ". Request id: [" << requestId << "]"))); + } } } static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { - if (!result.Issues) { - try { - TS3Result s3Result(std::move(result.Content.Extract())); - const auto& root = s3Result.GetRootNode(); - if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else if (root.Name() != "CompleteMultipartUploadResult") - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]"))); - else - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what() << ", request id: [" << requestId << "]"))); + if (result.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnMultipartUploadFinish error, response issues is not empty", requestId, std::move(result)))); + return; + } + + try { + TS3Result s3Result(std::move(result.Content.Extract())); + const auto& root = s3Result.GetRootNode(); + if (s3Result.IsError) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); + } else if (root.Name() != "CompleteMultipartUploadResult") + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]"))); + else { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } - } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Multipart error, request id: [" << requestId << "], ", std::move(result.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); + } catch (const std::exception& ex) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what() << ", request id: [" << requestId << "]"))); } } @@ -341,23 +341,23 @@ class TS3FileWriteActor : public TActorBootstrapped { } static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { - if (!result.Issues) { - if (result.Content.HttpResponseCode >= 300) { - TString errorText = result.Content.Extract(); - TString errorCode; - TString message; - if (ParseS3ErrorResponse(errorText, errorCode, message)) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, errorCode, TStringBuilder{} << message << ", request id: [" << requestId << "]"))); - } else { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]"))); - } + if (result.Issues) { + actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnUploadFinish error, response issues is not empty", requestId, std::move(result)))); + return; + } + + if (result.Content.HttpResponseCode >= 300) { + TString errorText = result.Content.Extract(); + TString errorCode; + TString message; + if (ParseS3ErrorResponse(errorText, errorCode, message)) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, errorCode, TStringBuilder{} << message << ", request id: [" << requestId << "]"))); } else { - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]"))); } } else { - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "UploadFinish error, request id: [" << requestId << "], ", std::move(result.Issues)); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues)))); + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } } @@ -444,7 +444,7 @@ class TS3FileWriteActor : public TActorBootstrapped { } void FailOnException() { - Send(ParentId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::BAD_REQUEST, CurrentExceptionMessage())); + Send(ParentId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << "Unexpected exception: " << CurrentExceptionMessage())})); SafeAbortMultipartUpload(); } @@ -614,18 +614,16 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput } void Handle(TEvPrivate::TEvUploadError::TPtr& result) { - LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString()); - - NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode; - if (statusCode == NDqProto::StatusIds::UNSPECIFIED) { - statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode); - if (statusCode == NDqProto::StatusIds::UNSPECIFIED) { - statusCode = NDqProto::StatusIds::INTERNAL_ERROR; - result->Get()->Issues.AddIssue("Got upload error with unspecified error code."); - } + auto status = result->Get()->Status; + auto issues = std::move(result->Get()->Issues); + LOG_W("TS3WriteActor", "TEvUploadError, status: " << status << ", issues: " << issues.ToOneLineString()); + + if (status == NDqProto::StatusIds::UNSPECIFIED) { + status = NDqProto::StatusIds::INTERNAL_ERROR; + issues.AddIssue("Got upload error with unspecified error code."); } - Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode); + Callbacks->OnAsyncOutputError(OutputIndex, issues, status); } void FinishIfNeeded() { @@ -742,4 +740,4 @@ std::pair CreateS3WriteActor( return {actor, actor}; } -} // namespace NYql::NDq +} // namespace NDq diff --git a/ydb/library/yql/providers/s3/common/util.cpp b/ydb/library/yql/providers/s3/common/util.cpp index 54c99fbad3fe..8b0cb426983f 100644 --- a/ydb/library/yql/providers/s3/common/util.cpp +++ b/ydb/library/yql/providers/s3/common/util.cpp @@ -30,7 +30,7 @@ char* UrlEscape(char* to, const char* from) { } -TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) { +TIssues AddParentIssue(const TString& prefix, TIssues&& issues) { if (!issues) { return TIssues{}; } @@ -41,6 +41,10 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) { return TIssues{result}; } +TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) { + return AddParentIssue(TString(prefix), std::move(issues)); +} + TString UrlEscapeRet(const TStringBuf from) { TString to; to.ReserveAndResize(CgiEscapeBufLen(from.size())); diff --git a/ydb/library/yql/providers/s3/common/util.h b/ydb/library/yql/providers/s3/common/util.h index 00df60e48f89..baa2cbd6b20e 100644 --- a/ydb/library/yql/providers/s3/common/util.h +++ b/ydb/library/yql/providers/s3/common/util.h @@ -6,6 +6,7 @@ namespace NYql::NS3Util { +TIssues AddParentIssue(const TString& prefix, TIssues&& issues); TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues); // Like UrlEscape with forceEscape = true From d916e7cdc0b0ab5f585e8a41a4f39d474652e9d8 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Thu, 20 Mar 2025 17:52:52 +0300 Subject: [PATCH 3/8] Refactored errors in s3 write actor --- .../s3/actors/yql_s3_write_actor.cpp | 108 +++++++++--------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 4fb507691e45..05399258d4f4 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -79,30 +79,40 @@ struct TEvPrivate { , Issues(std::move(issues)) {} - static TEvUploadError* InternalError(const TString& message, const TString& requestId, IHTTPGateway::TResult&& result) { - TIssues issues; - issues.AddIssues(NS3Util::AddParentIssue("Http geteway issues", std::move(result.Issues))); - issues.AddIssue(TStringBuilder() << "CURL response code: " << curl_easy_strerror(result.CurlResponseCode)); + TEvUploadError(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result) + : Status(NDqProto::StatusIds::INTERNAL_ERROR) + { + BuildIssues(message, requestId, responseBody, result); + } - TS3Result s3Result(std::move(result.Content.Extract())); + TEvUploadError(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result, const TS3Result& s3Result) { if (s3Result.IsError) { - TIssues issuesS3; if (s3Result.Parsed) { - issuesS3.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode); - issuesS3.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage); + Status = StatusFromS3ErrorCode(s3Result.S3ErrorCode); + Issues.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode); + Issues.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage); } else { - issuesS3.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage); + Status = NDqProto::StatusIds::INTERNAL_ERROR; + Issues.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage); } - issues.AddIssues(NS3Util::AddParentIssue("S3 issues", std::move(issuesS3))); + Issues = NS3Util::AddParentIssue("S3 issues", TIssues(Issues)); } + BuildIssues(message, requestId, responseBody, result); + } - issues.AddIssue(NS3Util::AddParentIssue("Http request info", { - TIssue(TStringBuilder() << "Response code: " << result.Content.HttpResponseCode), - TIssue(TStringBuilder() << "Headers: " << result.Content.Headers), - TIssue(TStringBuilder() << "Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE) << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"")) - })); - - return new TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, NS3Util::AddParentIssue(TStringBuilder() << message << ", s3 request id: [" << requestId << "]", std::move(issues))); + void BuildIssues(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result) { + Issues.AddIssues(NS3Util::AddParentIssue("Http geteway issues", TIssues(result.Issues))); + if (result.CurlResponseCode != CURLE_OK) { + Issues.AddIssue(TStringBuilder() << "CURL response code: " << curl_easy_strerror(result.CurlResponseCode)); + } + if (Status == NDqProto::StatusIds::INTERNAL_ERROR) { + Issues.AddIssues(NS3Util::AddParentIssue("Http request info", { + TIssue(TStringBuilder() << "Response code: " << result.Content.HttpResponseCode), + TIssue(TStringBuilder() << "Headers: " << result.Content.Headers), + TIssue(TStringBuilder() << "Body: \"" << TStringBuf(responseBody).Trunc(BODY_MAX_SIZE) << (responseBody.size() > BODY_MAX_SIZE ? "\"..." : "\"")) + })); + } + Issues = NS3Util::AddParentIssue(TStringBuilder() << message << ", s3 request id: [" << requestId << "]", TIssues(Issues)); } NDqProto::StatusIds::StatusCode Status = NDqProto::StatusIds::UNSPECIFIED; @@ -265,30 +275,32 @@ class TS3FileWriteActor : public TActorBootstrapped { ) static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) { - if (result.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnUploadsCreated error, response issues is not empty", requestId, std::move(result)))); + const TString body = result.Content.Extract(); + if (result.Issues || result.CurlResponseCode != CURLE_OK) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Create upload response issues is not empty", requestId, body, result))); return; } try { - TS3Result s3Result(std::move(result.Content.Extract())); + const TS3Result s3Result(body); const auto& root = s3Result.GetRootNode(); if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Create upload operation failed", requestId, body, result, s3Result))); } else if (root.Name() != "InitiateMultipartUploadResult") { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]"))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name(), requestId, body, result, s3Result))); } else { const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value()))); } } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what() << ", request id: [" << requestId << "]"))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Failed to parse create upload response: " << ex.what(), requestId, body, result))); } } static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& requestId, IHTTPGateway::TResult&& response) { + const TString body = response.Content.Extract(); if (response.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnPartUploadFinish error, response issues is not empty", requestId, std::move(response)))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Part upload finish response issues is not empty", requestId, body, response))); return; } @@ -297,38 +309,29 @@ class TS3FileWriteActor : public TActorBootstrapped { if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); } else { - TS3Result s3Result(std::move(response.Content.Extract())); - if (s3Result.IsError && s3Result.Parsed) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, - TStringBuilder() << "Unexpected response" - << ". Headers: " << headerStr - << ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE) - << (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"") - << ". Request id: [" << requestId << "]"))); - } + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Part upload failed", requestId, body, response, TS3Result(body)))); } } static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { + const TString body = result.Content.Extract(); if (result.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnMultipartUploadFinish error, response issues is not empty", requestId, std::move(result)))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Multipart upload finish response issues is not empty", requestId, body, result))); return; } try { - TS3Result s3Result(std::move(result.Content.Extract())); + const TS3Result s3Result(body); const auto& root = s3Result.GetRootNode(); if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]"))); - } else if (root.Name() != "CompleteMultipartUploadResult") - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]"))); - else { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Multipart upload operation failed", requestId, body, result, s3Result))); + } else if (root.Name() != "CompleteMultipartUploadResult") { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name(), requestId, body, result, s3Result))); + } else { actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what() << ", request id: [" << requestId << "]"))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish multipart upload response: " << ex.what(), requestId, body, result))); } } @@ -341,23 +344,22 @@ class TS3FileWriteActor : public TActorBootstrapped { } static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { + const TString body = result.Content.Extract(); if (result.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, TEvPrivate::TEvUploadError::InternalError("OnUploadFinish error, response issues is not empty", requestId, std::move(result)))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload finish response issues is not empty", requestId, body, result))); return; } - if (result.Content.HttpResponseCode >= 300) { - TString errorText = result.Content.Extract(); - TString errorCode; - TString message; - if (ParseS3ErrorResponse(errorText, errorCode, message)) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, errorCode, TStringBuilder{} << message << ", request id: [" << requestId << "]"))); + try { + const TS3Result s3Result(body); + if (s3Result.IsError) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload operation failed", requestId, body, result, s3Result))); } else { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]"))); + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } - } else { - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); + } catch (const std::exception& ex) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish upload response: " << ex.what(), requestId, body, result))); } } From b298e559f43f46409a8ace1ecf7ef9d225fcf856 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Thu, 20 Mar 2025 18:06:27 +0300 Subject: [PATCH 4/8] Cleanup --- ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 05399258d4f4..9549b7ddab9e 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -618,7 +618,7 @@ class TS3WriteActor : public TActorBootstrapped, public IDqComput void Handle(TEvPrivate::TEvUploadError::TPtr& result) { auto status = result->Get()->Status; auto issues = std::move(result->Get()->Issues); - LOG_W("TS3WriteActor", "TEvUploadError, status: " << status << ", issues: " << issues.ToOneLineString()); + LOG_W("TS3WriteActor", "TEvUploadError, status: " << NDqProto::StatusIds::StatusCode_Name(status) << ", issues: " << issues.ToOneLineString()); if (status == NDqProto::StatusIds::UNSPECIFIED) { status = NDqProto::StatusIds::INTERNAL_ERROR; @@ -742,4 +742,4 @@ std::pair CreateS3WriteActor( return {actor, actor}; } -} // namespace NDq +} // namespace NYql::NDq From 720b5cce1a768f2de43faaa9650327b1609a55af Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Thu, 20 Mar 2025 20:44:50 +0300 Subject: [PATCH 5/8] Fixed bug --- .../providers/s3/actors/yql_s3_write_actor.cpp | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 9549b7ddab9e..e8d54003e7ee 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -350,16 +350,11 @@ class TS3FileWriteActor : public TActorBootstrapped { return; } - try { - const TS3Result s3Result(body); - if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload operation failed", requestId, body, result, s3Result))); - } else { - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); - } - } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish upload response: " << ex.what(), requestId, body, result))); + if (result.Content.HttpResponseCode >= 300) { + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload operation failed", requestId, body, result, TS3Result(body)))); + } else { + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } } From 88345fbf5f3ce438dcd9535d6ac523f34bfd25ad Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Fri, 21 Mar 2025 09:51:09 +0300 Subject: [PATCH 6/8] Fixed refactor --- .../s3/actors/yql_s3_write_actor.cpp | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index e8d54003e7ee..2e7aa15a2911 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -82,25 +82,18 @@ struct TEvPrivate { TEvUploadError(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result) : Status(NDqProto::StatusIds::INTERNAL_ERROR) { - BuildIssues(message, requestId, responseBody, result); - } - - TEvUploadError(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result, const TS3Result& s3Result) { - if (s3Result.IsError) { - if (s3Result.Parsed) { - Status = StatusFromS3ErrorCode(s3Result.S3ErrorCode); - Issues.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode); - Issues.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage); - } else { - Status = NDqProto::StatusIds::INTERNAL_ERROR; - Issues.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage); + if (responseBody) { + if (const TS3Result s3Result(responseBody); s3Result.IsError) { + if (s3Result.Parsed) { + Status = StatusFromS3ErrorCode(s3Result.S3ErrorCode); + Issues.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode); + Issues.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage); + } else { + Issues.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage); + } + Issues = NS3Util::AddParentIssue("S3 issues", TIssues(Issues)); } - Issues = NS3Util::AddParentIssue("S3 issues", TIssues(Issues)); } - BuildIssues(message, requestId, responseBody, result); - } - - void BuildIssues(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result) { Issues.AddIssues(NS3Util::AddParentIssue("Http geteway issues", TIssues(result.Issues))); if (result.CurlResponseCode != CURLE_OK) { Issues.AddIssue(TStringBuilder() << "CURL response code: " << curl_easy_strerror(result.CurlResponseCode)); @@ -285,9 +278,9 @@ class TS3FileWriteActor : public TActorBootstrapped { const TS3Result s3Result(body); const auto& root = s3Result.GetRootNode(); if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Create upload operation failed", requestId, body, result, s3Result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Create upload operation failed", requestId, body, result))); } else if (root.Name() != "InitiateMultipartUploadResult") { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name(), requestId, body, result, s3Result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name(), requestId, body, result))); } else { const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value()))); @@ -309,7 +302,7 @@ class TS3FileWriteActor : public TActorBootstrapped { if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); } else { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Part upload failed", requestId, body, response, TS3Result(body)))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Part upload failed", requestId, body, response))); } } @@ -324,9 +317,9 @@ class TS3FileWriteActor : public TActorBootstrapped { const TS3Result s3Result(body); const auto& root = s3Result.GetRootNode(); if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Multipart upload operation failed", requestId, body, result, s3Result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Multipart upload operation failed", requestId, body, result))); } else if (root.Name() != "CompleteMultipartUploadResult") { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name(), requestId, body, result, s3Result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name(), requestId, body, result))); } else { actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } @@ -351,7 +344,7 @@ class TS3FileWriteActor : public TActorBootstrapped { } if (result.Content.HttpResponseCode >= 300) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload operation failed", requestId, body, result, TS3Result(body)))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload operation failed", requestId, body, result))); } else { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); From 1d0f193c8a3c777908d3c209a043e49485457762 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Fri, 21 Mar 2025 10:22:08 +0300 Subject: [PATCH 7/8] Cleanup --- ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 2e7aa15a2911..2f5c47bafd7c 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -269,7 +269,7 @@ class TS3FileWriteActor : public TActorBootstrapped { static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) { const TString body = result.Content.Extract(); - if (result.Issues || result.CurlResponseCode != CURLE_OK) { + if (result.Issues) { actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Create upload response issues is not empty", requestId, body, result))); return; } From 3f5ae6770c60413206e9e6ccd85d169c503d9cd1 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Fri, 21 Mar 2025 12:46:43 +0300 Subject: [PATCH 8/8] Added url into errors --- .../s3/actors/yql_s3_write_actor.cpp | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 2f5c47bafd7c..d1b7e96c8a64 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -182,7 +182,7 @@ class TS3FileWriteActor : public TActorBootstrapped { Gateway->Upload(Url + "?uploads", IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), "", - std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1), + std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, Url, RequestId, std::placeholders::_1), false, RetryPolicy); } @@ -267,10 +267,10 @@ class TS3FileWriteActor : public TActorBootstrapped { hFunc(TEvPrivate::TEvUploadFinished, Handle); ) - static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) { + static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& url, const TString& requestId, IHTTPGateway::TResult&& result) { const TString body = result.Content.Extract(); if (result.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Create upload response issues is not empty", requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload response issues is not empty, url: " << url, requestId, body, result))); return; } @@ -278,22 +278,22 @@ class TS3FileWriteActor : public TActorBootstrapped { const TS3Result s3Result(body); const auto& root = s3Result.GetRootNode(); if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Create upload operation failed", requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload operation failed, url: " << url, requestId, body, result))); } else if (root.Name() != "InitiateMultipartUploadResult") { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name(), requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", url: " << url, requestId, body, result))); } else { const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value()))); } } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Failed to parse create upload response: " << ex.what(), requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Failed to parse create upload response: " << ex.what() << ", url: " << url, requestId, body, result))); } } - static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& requestId, IHTTPGateway::TResult&& response) { + static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& url, const TString& requestId, IHTTPGateway::TResult&& response) { const TString body = response.Content.Extract(); if (response.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Part upload finish response issues is not empty", requestId, body, response))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload finish response issues is not empty, url: " << url, requestId, body, response))); return; } @@ -302,14 +302,14 @@ class TS3FileWriteActor : public TActorBootstrapped { if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag"))))); } else { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Part upload failed", requestId, body, response))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload failed, url: " << url, requestId, body, response))); } } static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { const TString body = result.Content.Extract(); if (result.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Multipart upload finish response issues is not empty", requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload finish response issues is not empty, url: " << url, requestId, body, result))); return; } @@ -317,14 +317,14 @@ class TS3FileWriteActor : public TActorBootstrapped { const TS3Result s3Result(body); const auto& root = s3Result.GetRootNode(); if (s3Result.IsError) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Multipart upload operation failed", requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload operation failed, url: " << url, requestId, body, result))); } else if (root.Name() != "CompleteMultipartUploadResult") { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name(), requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name() << ", url: " << url, requestId, body, result))); } else { actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); } } catch (const std::exception& ex) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish multipart upload response: " << ex.what(), requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish multipart upload response: " << ex.what() << ", url: " << url, requestId, body, result))); } } @@ -339,12 +339,12 @@ class TS3FileWriteActor : public TActorBootstrapped { static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) { const TString body = result.Content.Extract(); if (result.Issues) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload finish response issues is not empty", requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload finish response issues is not empty, url: " << url, requestId, body, result))); return; } if (result.Content.HttpResponseCode >= 300) { - actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError("Upload operation failed", requestId, body, result))); + actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload operation failed, url: " << url, requestId, body, result))); } else { actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize))); @@ -377,7 +377,7 @@ class TS3FileWriteActor : public TActorBootstrapped { Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), std::move(part), - std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, RequestId, std::placeholders::_1), + std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, Url, RequestId, std::placeholders::_1), true, RetryPolicy); }