Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-4186 refactor errors in s3 write actor #16020

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e530b7a
Added YDB_AUTH_TICKET_HEADER
GrigoriyPA Mar 6, 2025
3849577
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 6, 2025
943aac2
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 6, 2025
eec951e
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 6, 2025
bc15932
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 7, 2025
38f7f13
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 7, 2025
814a275
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 10, 2025
aacd71b
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 11, 2025
cd7cd84
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 11, 2025
b81fc9e
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 11, 2025
c996d91
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 11, 2025
066c1c7
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 12, 2025
23994b2
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 15, 2025
2885c43
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 18, 2025
60f1a10
Merge branch 'ydb-platform:main' into main
GrigoriyPA Mar 18, 2025
dd2b452
TMP
GrigoriyPA Mar 19, 2025
0630545
Merge branch 'main' into YQ-4186-refactor-errors-in-s3-write-actor
GrigoriyPA Mar 20, 2025
d916e7c
Refactored errors in s3 write actor
GrigoriyPA Mar 20, 2025
b298e55
Cleanup
GrigoriyPA Mar 20, 2025
720b5cc
Fixed bug
GrigoriyPA Mar 20, 2025
88345fb
Fixed refactor
GrigoriyPA Mar 21, 2025
1d0f193
Cleanup
GrigoriyPA Mar 21, 2025
3f5ae67
Added url into errors
GrigoriyPA Mar 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 107 additions & 119 deletions ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,40 +72,43 @@ struct TEvPrivate {
};

struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> {

TEvUploadError(long httpCode, const TString& s3ErrorCode, const TString& message)
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), S3ErrorCode(s3ErrorCode), Message(message) {
BuildIssues();
}

TEvUploadError(const TString& s3ErrorCode, const TString& message)
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), S3ErrorCode(s3ErrorCode), Message(message) {
BuildIssues();
}

TEvUploadError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message)
: StatusCode(statusCode), HttpCode(0), Message(message) {
BuildIssues();
}

TEvUploadError(long httpCode, const TString& message)
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), Message(message) {
BuildIssues();
}

TEvUploadError(TIssues&& issues)
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), Issues(issues) {
// don't build
}

void BuildIssues() {
Issues = ::NYql::NDq::BuildIssues(HttpCode, S3ErrorCode, Message);
static constexpr size_t BODY_MAX_SIZE = 1_KB;

TEvUploadError(NDqProto::StatusIds::StatusCode status, TIssues&& issues)
: Status(status)
, Issues(std::move(issues))
{}

TEvUploadError(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

зачем передавать responseBody отдельным аргументом, если он всегда равен response.Content.Extract()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сделал так, потому что response.Content.Extract() можно вызывать лишь раз, там делается std::move от контента (а он ещё нужен при обработке, чтобы s3Result создать)

: Status(NDqProto::StatusIds::INTERNAL_ERROR)
{
if (responseBody) {
if (const TS3Result s3Result(responseBody); s3Result.IsError) {
if (s3Result.Parsed) {
Status = StatusFromS3ErrorCode(s3Result.S3ErrorCode);
Issues.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode);
Issues.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage);
} else {
Issues.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage);
}
Issues = NS3Util::AddParentIssue("S3 issues", TIssues(Issues));
}
}
Issues.AddIssues(NS3Util::AddParentIssue("Http geteway issues", TIssues(result.Issues)));
if (result.CurlResponseCode != CURLE_OK) {
Issues.AddIssue(TStringBuilder() << "CURL response code: " << curl_easy_strerror(result.CurlResponseCode));
}
if (Status == NDqProto::StatusIds::INTERNAL_ERROR) {
Issues.AddIssues(NS3Util::AddParentIssue("Http request info", {
TIssue(TStringBuilder() << "Response code: " << result.Content.HttpResponseCode),
TIssue(TStringBuilder() << "Headers: " << result.Content.Headers),
TIssue(TStringBuilder() << "Body: \"" << TStringBuf(responseBody).Trunc(BODY_MAX_SIZE) << (responseBody.size() > BODY_MAX_SIZE ? "\"..." : "\""))
}));
}
Issues = NS3Util::AddParentIssue(TStringBuilder() << message << ", s3 request id: [" << requestId << "]", TIssues(Issues));
}

NYql::NDqProto::StatusIds::StatusCode StatusCode;
long HttpCode;
TString S3ErrorCode;
TString Message;
NDqProto::StatusIds::StatusCode Status = NDqProto::StatusIds::UNSPECIFIED;
TIssues Issues;
};

Expand Down Expand Up @@ -179,7 +182,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
Gateway->Upload(Url + "?uploads",
IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
"",
std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1),
std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, Url, RequestId, std::placeholders::_1),
false,
RetryPolicy);
}
Expand Down Expand Up @@ -264,71 +267,64 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
hFunc(TEvPrivate::TEvUploadFinished, Handle);
)

static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) {
if (!result.Issues) {
try {
TS3Result s3Result(std::move(result.Content.Extract()));
const auto& root = s3Result.GetRootNode();
if (s3Result.IsError) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
} else if (root.Name() != "InitiateMultipartUploadResult")
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]")));
else {
const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
}
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what() << ", request id: [" << requestId << "]")));
}
} else {
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Upload error, request id: [" << requestId << "], ", std::move(result.Issues));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
}
}

static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& requestId, IHTTPGateway::TResult&& response) {
if (!response.Issues) {
const auto& str = response.Content.Headers;
const auto headerStr = str.substr(str.rfind("HTTP/"));
if (const NHttp::THeaders headers(headerStr); headers.Has("Etag"))
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag")))));
else {
TS3Result s3Result(std::move(response.Content.Extract()));
if (s3Result.IsError && s3Result.Parsed) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
} else {
constexpr size_t BODY_MAX_SIZE = 1_KB;
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR,
TStringBuilder() << "Unexpected response"
<< ". Headers: " << headerStr
<< ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE)
<< (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"")
<< ". Request id: [" << requestId << "]")));
}
static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& url, const TString& requestId, IHTTPGateway::TResult&& result) {
const TString body = result.Content.Extract();
if (result.Issues) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload response issues is not empty, url: " << url, requestId, body, result)));
return;
}

try {
const TS3Result s3Result(body);
const auto& root = s3Result.GetRootNode();
if (s3Result.IsError) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload operation failed, url: " << url, requestId, body, result)));
} else if (root.Name() != "InitiateMultipartUploadResult") {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", url: " << url, requestId, body, result)));
} else {
const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
}
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Failed to parse create upload response: " << ex.what() << ", url: " << url, requestId, body, result)));
}
}

static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& url, const TString& requestId, IHTTPGateway::TResult&& response) {
const TString body = response.Content.Extract();
if (response.Issues) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload finish response issues is not empty, url: " << url, requestId, body, response)));
return;
}

const auto& str = response.Content.Headers;
const auto headerStr = str.substr(str.rfind("HTTP/"));
if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) {
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag")))));
} else {
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "PartUpload error, request id: [" << requestId << "], ", std::move(response.Issues));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload failed, url: " << url, requestId, body, response)));
}
}

static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) {
if (!result.Issues) {
try {
TS3Result s3Result(std::move(result.Content.Extract()));
const auto& root = s3Result.GetRootNode();
if (s3Result.IsError) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
} else if (root.Name() != "CompleteMultipartUploadResult")
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]")));
else
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what() << ", request id: [" << requestId << "]")));
const TString body = result.Content.Extract();
if (result.Issues) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload finish response issues is not empty, url: " << url, requestId, body, result)));
return;
}

try {
const TS3Result s3Result(body);
const auto& root = s3Result.GetRootNode();
if (s3Result.IsError) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload operation failed, url: " << url, requestId, body, result)));
} else if (root.Name() != "CompleteMultipartUploadResult") {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name() << ", url: " << url, requestId, body, result)));
} else {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
}
} else {
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Multipart error, request id: [" << requestId << "], ", std::move(result.Issues));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish multipart upload response: " << ex.what() << ", url: " << url, requestId, body, result)));
}
}

Expand All @@ -341,23 +337,17 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
}

static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) {
if (!result.Issues) {
if (result.Content.HttpResponseCode >= 300) {
TString errorText = result.Content.Extract();
TString errorCode;
TString message;
if (ParseS3ErrorResponse(errorText, errorCode, message)) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, errorCode, TStringBuilder{} << message << ", request id: [" << requestId << "]")));
} else {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]")));
}
} else {
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
}
const TString body = result.Content.Extract();
if (result.Issues) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload finish response issues is not empty, url: " << url, requestId, body, result)));
return;
}

if (result.Content.HttpResponseCode >= 300) {
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload operation failed, url: " << url, requestId, body, result)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

в этом случае ошибка не проглотится? если CurlResponseCode == CURLE_OK, а Issues пустые

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут дело в том, что в случае успеха приходит http ответ с пустым body, по этому так валидируется

} else {
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "UploadFinish error, request id: [" << requestId << "], ", std::move(result.Issues));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
}
}

Expand Down Expand Up @@ -387,7 +377,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId,
IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
std::move(part),
std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, RequestId, std::placeholders::_1),
std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, Url, RequestId, std::placeholders::_1),
true,
RetryPolicy);
}
Expand Down Expand Up @@ -444,7 +434,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
}

void FailOnException() {
Send(ParentId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::BAD_REQUEST, CurrentExceptionMessage()));
Send(ParentId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << "Unexpected exception: " << CurrentExceptionMessage())}));
SafeAbortMultipartUpload();
}

Expand Down Expand Up @@ -614,18 +604,16 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
}

void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString());

NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode;
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode);
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
statusCode = NDqProto::StatusIds::INTERNAL_ERROR;
result->Get()->Issues.AddIssue("Got upload error with unspecified error code.");
}
auto status = result->Get()->Status;
auto issues = std::move(result->Get()->Issues);
LOG_W("TS3WriteActor", "TEvUploadError, status: " << NDqProto::StatusIds::StatusCode_Name(status) << ", issues: " << issues.ToOneLineString());

if (status == NDqProto::StatusIds::UNSPECIFIED) {
status = NDqProto::StatusIds::INTERNAL_ERROR;
issues.AddIssue("Got upload error with unspecified error code.");
}

Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode);
Callbacks->OnAsyncOutputError(OutputIndex, issues, status);
}

void FinishIfNeeded() {
Expand Down
6 changes: 5 additions & 1 deletion ydb/library/yql/providers/s3/common/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ char* UrlEscape(char* to, const char* from) {

}

TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) {
TIssues AddParentIssue(const TString& prefix, TIssues&& issues) {
if (!issues) {
return TIssues{};
}
Expand All @@ -41,6 +41,10 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) {
return TIssues{result};
}

TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) {
return AddParentIssue(TString(prefix), std::move(issues));
}

TString UrlEscapeRet(const TStringBuf from) {
TString to;
to.ReserveAndResize(CgiEscapeBufLen(from.size()));
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/common/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

namespace NYql::NS3Util {

TIssues AddParentIssue(const TString& prefix, TIssues&& issues);
TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues);

// Like UrlEscape with forceEscape = true
Expand Down
Loading