Skip to content

Commit 22e1472

Browse files
authoredMar 21, 2025··
YQ-4186 refactor errors in s3 write actor (#16020)
1 parent 60e9366 commit 22e1472

File tree

3 files changed

+113
-120
lines changed

3 files changed

+113
-120
lines changed
 

‎ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp

+107-119
Original file line numberDiff line numberDiff line change
@@ -72,40 +72,43 @@ struct TEvPrivate {
7272
};
7373

7474
struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> {
75-
76-
TEvUploadError(long httpCode, const TString& s3ErrorCode, const TString& message)
77-
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), S3ErrorCode(s3ErrorCode), Message(message) {
78-
BuildIssues();
79-
}
80-
81-
TEvUploadError(const TString& s3ErrorCode, const TString& message)
82-
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), S3ErrorCode(s3ErrorCode), Message(message) {
83-
BuildIssues();
84-
}
85-
86-
TEvUploadError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message)
87-
: StatusCode(statusCode), HttpCode(0), Message(message) {
88-
BuildIssues();
89-
}
90-
91-
TEvUploadError(long httpCode, const TString& message)
92-
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(httpCode), Message(message) {
93-
BuildIssues();
94-
}
95-
96-
TEvUploadError(TIssues&& issues)
97-
: StatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED), HttpCode(0), Issues(issues) {
98-
// don't build
99-
}
100-
101-
void BuildIssues() {
102-
Issues = ::NYql::NDq::BuildIssues(HttpCode, S3ErrorCode, Message);
75+
static constexpr size_t BODY_MAX_SIZE = 1_KB;
76+
77+
TEvUploadError(NDqProto::StatusIds::StatusCode status, TIssues&& issues)
78+
: Status(status)
79+
, Issues(std::move(issues))
80+
{}
81+
82+
TEvUploadError(const TString& message, const TString& requestId, const TString& responseBody, const IHTTPGateway::TResult& result)
83+
: Status(NDqProto::StatusIds::INTERNAL_ERROR)
84+
{
85+
if (responseBody) {
86+
if (const TS3Result s3Result(responseBody); s3Result.IsError) {
87+
if (s3Result.Parsed) {
88+
Status = StatusFromS3ErrorCode(s3Result.S3ErrorCode);
89+
Issues.AddIssue(TStringBuilder() << "Error code: " << s3Result.S3ErrorCode);
90+
Issues.AddIssue(TStringBuilder() << "Error message: " << s3Result.ErrorMessage);
91+
} else {
92+
Issues.AddIssue(TStringBuilder() << "Failed to parse s3 response: " << s3Result.ErrorMessage);
93+
}
94+
Issues = NS3Util::AddParentIssue("S3 issues", TIssues(Issues));
95+
}
96+
}
97+
Issues.AddIssues(NS3Util::AddParentIssue("Http geteway issues", TIssues(result.Issues)));
98+
if (result.CurlResponseCode != CURLE_OK) {
99+
Issues.AddIssue(TStringBuilder() << "CURL response code: " << curl_easy_strerror(result.CurlResponseCode));
100+
}
101+
if (Status == NDqProto::StatusIds::INTERNAL_ERROR) {
102+
Issues.AddIssues(NS3Util::AddParentIssue("Http request info", {
103+
TIssue(TStringBuilder() << "Response code: " << result.Content.HttpResponseCode),
104+
TIssue(TStringBuilder() << "Headers: " << result.Content.Headers),
105+
TIssue(TStringBuilder() << "Body: \"" << TStringBuf(responseBody).Trunc(BODY_MAX_SIZE) << (responseBody.size() > BODY_MAX_SIZE ? "\"..." : "\""))
106+
}));
107+
}
108+
Issues = NS3Util::AddParentIssue(TStringBuilder() << message << ", s3 request id: [" << requestId << "]", TIssues(Issues));
103109
}
104110

105-
NYql::NDqProto::StatusIds::StatusCode StatusCode;
106-
long HttpCode;
107-
TString S3ErrorCode;
108-
TString Message;
111+
NDqProto::StatusIds::StatusCode Status = NDqProto::StatusIds::UNSPECIFIED;
109112
TIssues Issues;
110113
};
111114

@@ -179,7 +182,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
179182
Gateway->Upload(Url + "?uploads",
180183
IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
181184
"",
182-
std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1),
185+
std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, Url, RequestId, std::placeholders::_1),
183186
false,
184187
RetryPolicy);
185188
}
@@ -264,71 +267,64 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
264267
hFunc(TEvPrivate::TEvUploadFinished, Handle);
265268
)
266269

267-
static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& requestId, IHTTPGateway::TResult&& result) {
268-
if (!result.Issues) {
269-
try {
270-
TS3Result s3Result(std::move(result.Content.Extract()));
271-
const auto& root = s3Result.GetRootNode();
272-
if (s3Result.IsError) {
273-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
274-
} else if (root.Name() != "InitiateMultipartUploadResult")
275-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", request id: [" << requestId << "]")));
276-
else {
277-
const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
278-
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
279-
}
280-
} catch (const std::exception& ex) {
281-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse create upload response: " << ex.what() << ", request id: [" << requestId << "]")));
282-
}
283-
} else {
284-
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Upload error, request id: [" << requestId << "], ", std::move(result.Issues));
285-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
286-
}
287-
}
288-
289-
static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& requestId, IHTTPGateway::TResult&& response) {
290-
if (!response.Issues) {
291-
const auto& str = response.Content.Headers;
292-
const auto headerStr = str.substr(str.rfind("HTTP/"));
293-
if (const NHttp::THeaders headers(headerStr); headers.Has("Etag"))
294-
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag")))));
295-
else {
296-
TS3Result s3Result(std::move(response.Content.Extract()));
297-
if (s3Result.IsError && s3Result.Parsed) {
298-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << "Upload failed: " << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
299-
} else {
300-
constexpr size_t BODY_MAX_SIZE = 1_KB;
301-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR,
302-
TStringBuilder() << "Unexpected response"
303-
<< ". Headers: " << headerStr
304-
<< ". Body: \"" << TStringBuf(s3Result.Body).Trunc(BODY_MAX_SIZE)
305-
<< (s3Result.Body.size() > BODY_MAX_SIZE ? "\"..." : "\"")
306-
<< ". Request id: [" << requestId << "]")));
307-
}
270+
static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& url, const TString& requestId, IHTTPGateway::TResult&& result) {
271+
const TString body = result.Content.Extract();
272+
if (result.Issues) {
273+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload response issues is not empty, url: " << url, requestId, body, result)));
274+
return;
275+
}
276+
277+
try {
278+
const TS3Result s3Result(body);
279+
const auto& root = s3Result.GetRootNode();
280+
if (s3Result.IsError) {
281+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Create upload operation failed, url: " << url, requestId, body, result)));
282+
} else if (root.Name() != "InitiateMultipartUploadResult") {
283+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on create upload: " << root.Name() << ", url: " << url, requestId, body, result)));
284+
} else {
285+
const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
286+
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
308287
}
288+
} catch (const std::exception& ex) {
289+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Failed to parse create upload response: " << ex.what() << ", url: " << url, requestId, body, result)));
290+
}
291+
}
292+
293+
static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, const TString& url, const TString& requestId, IHTTPGateway::TResult&& response) {
294+
const TString body = response.Content.Extract();
295+
if (response.Issues) {
296+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload finish response issues is not empty, url: " << url, requestId, body, response)));
297+
return;
298+
}
299+
300+
const auto& str = response.Content.Headers;
301+
const auto headerStr = str.substr(str.rfind("HTTP/"));
302+
if (const NHttp::THeaders headers(headerStr); headers.Has("Etag")) {
303+
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadPartFinished(size, index, TString(headers.Get("Etag")))));
309304
} else {
310-
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "PartUpload error, request id: [" << requestId << "], ", std::move(response.Issues));
311-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
305+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Part " << index << " upload failed, url: " << url, requestId, body, response)));
312306
}
313307
}
314308

315309
static void OnMultipartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) {
316-
if (!result.Issues) {
317-
try {
318-
TS3Result s3Result(std::move(result.Content.Extract()));
319-
const auto& root = s3Result.GetRootNode();
320-
if (s3Result.IsError) {
321-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(s3Result.S3ErrorCode, TStringBuilder{} << s3Result.ErrorMessage << ", request id: [" << requestId << "]")));
322-
} else if (root.Name() != "CompleteMultipartUploadResult")
323-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Unexpected response on finish upload: " << root.Name() << ", request id: [" << requestId << "]")));
324-
else
325-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
326-
} catch (const std::exception& ex) {
327-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Error on parse finish upload response: " << ex.what() << ", request id: [" << requestId << "]")));
310+
const TString body = result.Content.Extract();
311+
if (result.Issues) {
312+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload finish response issues is not empty, url: " << url, requestId, body, result)));
313+
return;
314+
}
315+
316+
try {
317+
const TS3Result s3Result(body);
318+
const auto& root = s3Result.GetRootNode();
319+
if (s3Result.IsError) {
320+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Multipart upload operation failed, url: " << url, requestId, body, result)));
321+
} else if (root.Name() != "CompleteMultipartUploadResult") {
322+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Unexpected response on finish multipart upload: " << root.Name() << ", url: " << url, requestId, body, result)));
323+
} else {
324+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
328325
}
329-
} else {
330-
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Multipart error, request id: [" << requestId << "], ", std::move(result.Issues));
331-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
326+
} catch (const std::exception& ex) {
327+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Error on parse finish multipart upload response: " << ex.what() << ", url: " << url, requestId, body, result)));
332328
}
333329
}
334330

@@ -341,23 +337,17 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
341337
}
342338

343339
static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, const TString& key, const TString& url, const TString& requestId, ui64 sentSize, IHTTPGateway::TResult&& result) {
344-
if (!result.Issues) {
345-
if (result.Content.HttpResponseCode >= 300) {
346-
TString errorText = result.Content.Extract();
347-
TString errorCode;
348-
TString message;
349-
if (ParseS3ErrorResponse(errorText, errorCode, message)) {
350-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, errorCode, TStringBuilder{} << message << ", request id: [" << requestId << "]")));
351-
} else {
352-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(result.Content.HttpResponseCode, TStringBuilder{} << errorText << ", request id: [" << requestId << "]")));
353-
}
354-
} else {
355-
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
356-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
357-
}
340+
const TString body = result.Content.Extract();
341+
if (result.Issues) {
342+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload finish response issues is not empty, url: " << url, requestId, body, result)));
343+
return;
344+
}
345+
346+
if (result.Content.HttpResponseCode >= 300) {
347+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(TStringBuilder() << "Upload operation failed, url: " << url, requestId, body, result)));
358348
} else {
359-
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "UploadFinish error, request id: [" << requestId << "], ", std::move(result.Issues));
360-
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadError(std::move(issues))));
349+
actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
350+
actorSystem->Send(new IEventHandle(parentId, selfId, new TEvPrivate::TEvUploadFinished(key, url, sentSize)));
361351
}
362352
}
363353

@@ -387,7 +377,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
387377
Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId,
388378
IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
389379
std::move(part),
390-
std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, RequestId, std::placeholders::_1),
380+
std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, Url, RequestId, std::placeholders::_1),
391381
true,
392382
RetryPolicy);
393383
}
@@ -444,7 +434,7 @@ class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
444434
}
445435

446436
void FailOnException() {
447-
Send(ParentId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::BAD_REQUEST, CurrentExceptionMessage()));
437+
Send(ParentId, new TEvPrivate::TEvUploadError(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << "Unexpected exception: " << CurrentExceptionMessage())}));
448438
SafeAbortMultipartUpload();
449439
}
450440

@@ -614,18 +604,16 @@ class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComput
614604
}
615605

616606
void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
617-
LOG_W("TS3WriteActor", "TEvUploadError " << result->Get()->Issues.ToOneLineString());
618-
619-
NDqProto::StatusIds::StatusCode statusCode = result->Get()->StatusCode;
620-
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
621-
statusCode = StatusFromS3ErrorCode(result->Get()->S3ErrorCode);
622-
if (statusCode == NDqProto::StatusIds::UNSPECIFIED) {
623-
statusCode = NDqProto::StatusIds::INTERNAL_ERROR;
624-
result->Get()->Issues.AddIssue("Got upload error with unspecified error code.");
625-
}
607+
auto status = result->Get()->Status;
608+
auto issues = std::move(result->Get()->Issues);
609+
LOG_W("TS3WriteActor", "TEvUploadError, status: " << NDqProto::StatusIds::StatusCode_Name(status) << ", issues: " << issues.ToOneLineString());
610+
611+
if (status == NDqProto::StatusIds::UNSPECIFIED) {
612+
status = NDqProto::StatusIds::INTERNAL_ERROR;
613+
issues.AddIssue("Got upload error with unspecified error code.");
626614
}
627615

628-
Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Issues, statusCode);
616+
Callbacks->OnAsyncOutputError(OutputIndex, issues, status);
629617
}
630618

631619
void FinishIfNeeded() {

‎ydb/library/yql/providers/s3/common/util.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ char* UrlEscape(char* to, const char* from) {
3030

3131
}
3232

33-
TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) {
33+
TIssues AddParentIssue(const TString& prefix, TIssues&& issues) {
3434
if (!issues) {
3535
return TIssues{};
3636
}
@@ -41,6 +41,10 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) {
4141
return TIssues{result};
4242
}
4343

44+
TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues) {
45+
return AddParentIssue(TString(prefix), std::move(issues));
46+
}
47+
4448
TString UrlEscapeRet(const TStringBuf from) {
4549
TString to;
4650
to.ReserveAndResize(CgiEscapeBufLen(from.size()));

‎ydb/library/yql/providers/s3/common/util.h

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
namespace NYql::NS3Util {
88

9+
TIssues AddParentIssue(const TString& prefix, TIssues&& issues);
910
TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues);
1011

1112
// Like UrlEscape with forceEscape = true

0 commit comments

Comments
 (0)
Please sign in to comment.