Skip to content

Commit ecdf9d1

Browse files
committed
Use error types considered spurious, implement jitter
1 parent 6193107 commit ecdf9d1

File tree

1 file changed

+190
-148
lines changed

1 file changed

+190
-148
lines changed

src/Package/Fetch.zig

Lines changed: 190 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,7 @@ latest_commit: ?git.Oid,
9393
/// the root source file.
9494
module: ?*Package.Module,
9595

96-
/// The number of times an HTTP request will retry if it fails
97-
retry_count: u16 = 3,
98-
99-
/// The delay in milliseconds between HTTP request retries
100-
retry_delay_ms: u32 = 500,
96+
retry: Retry = .{},
10197

10298
pub const LazyStatus = enum {
10399
/// Not lazy.
@@ -331,6 +327,84 @@ pub const RunError = error{
331327
FetchFailed,
332328
};
333329

330+
pub const Retry = struct {
331+
/// The number of failed attempts that have been done so far.
332+
///
333+
/// Starts at 0, and increases by one each time an attempt fails.
334+
cur_retries: u16 = 0,
335+
/// The maximum number of times the operation should be retried.
336+
///
337+
/// 0 means it should never retry.
338+
max_retries: u16 = 3,
339+
/// Hard cap of how many milliseconds to wait between retries.
340+
const MAX_RETRY_SLEEP_MS = 10 * 1000;
341+
/// Minimum time a jittered retry delay could be
342+
const MIN_RETRY_JITTER_MS = 500;
343+
/// Maximum time a jittered retry delay could be
344+
const MAX_RETRY_JITTER_MS = 1500;
345+
346+
fn retryDelayMs(r: *Retry) i64 {
347+
if (r.cur_retries == 0) {
348+
return std.crypto.random.intRangeAtMost(i64, MIN_RETRY_JITTER_MS, MAX_RETRY_JITTER_MS);
349+
} else {
350+
return @min(r.cur_retries * 3 * 1000, MAX_RETRY_SLEEP_MS);
351+
}
352+
}
353+
354+
fn callWithRetries(
355+
r: *Retry,
356+
io: Io,
357+
comptime FunctionType: type,
358+
callback: FunctionType,
359+
args: anytype,
360+
) @typeInfo(FunctionType).@"fn".return_type.? {
361+
while (true) {
362+
return @call(.auto, callback, args) catch |err| {
363+
if (maybeSpurious(err) and r.cur_retries < r.max_retries) {
364+
const delay = Io.Duration.fromMilliseconds(r.retryDelayMs());
365+
try io.sleep(delay, .awake);
366+
r.cur_retries += 1;
367+
continue;
368+
}
369+
return err;
370+
};
371+
}
372+
}
373+
};
374+
375+
const BadHttpStatus = error{
376+
MaybeSpurious,
377+
NonSpurious,
378+
};
379+
380+
fn maybeSpurious(err: anyerror) bool {
381+
return switch (err) {
382+
// tcp errors
383+
std.http.Client.ConnectTcpError.Timeout,
384+
std.http.Client.ConnectTcpError.ConnectionResetByPeer,
385+
std.http.Client.ConnectTcpError.ConnectionRefused,
386+
std.http.Client.ConnectTcpError.NetworkUnreachable,
387+
std.http.Client.ConnectTcpError.AddressInUse,
388+
std.http.Client.ConnectTcpError.NameServerFailure,
389+
std.http.Client.ConnectTcpError.SystemResources,
390+
391+
// io errors
392+
std.http.Client.Request.ReceiveHeadError.WriteFailed,
393+
std.http.Client.Request.ReceiveHeadError.ReadFailed,
394+
395+
// http errors
396+
std.http.Client.Request.ReceiveHeadError.HttpRequestTruncated,
397+
std.http.Client.Request.ReceiveHeadError.HttpConnectionClosing,
398+
std.http.Client.Request.ReceiveHeadError.HttpChunkTruncated,
399+
std.http.Client.Request.ReceiveHeadError.HttpChunkInvalid,
400+
BadHttpStatus.MaybeSpurious,
401+
402+
Allocator.Error.OutOfMemory,
403+
=> true,
404+
else => false,
405+
};
406+
}
407+
334408
pub fn run(f: *Fetch) RunError!void {
335409
const io = f.io;
336410
const eb = &f.error_bundle;
@@ -999,10 +1073,10 @@ const init_resource_buffer_size = git.Packet.max_data_length;
9991073

10001074
fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void {
10011075
const io = f.io;
1002-
const arena = f.arena.allocator();
10031076
const eb = &f.error_bundle;
10041077

10051078
if (ascii.eqlIgnoreCase(uri.scheme, "file")) {
1079+
const arena = f.arena.allocator();
10061080
const path = try uri.path.toRawMaybeAlloc(arena);
10071081
const file = f.parent_package_root.openFile(path, .{}) catch |err| {
10081082
return f.fail(f.location_tok, try eb.printString("unable to open '{f}{s}': {t}", .{
@@ -1013,162 +1087,130 @@ fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u
10131087
return;
10141088
}
10151089

1016-
const http_client = f.job_queue.http_client;
1017-
10181090
if (ascii.eqlIgnoreCase(uri.scheme, "http") or
10191091
ascii.eqlIgnoreCase(uri.scheme, "https"))
10201092
{
1021-
var retries_left = f.retry_count;
1022-
resource.* = .{ .http_request = .{
1023-
.request = while (true) {
1024-
break http_client.request(.GET, uri, .{}) catch |err| {
1025-
if (retries_left > 0) {
1026-
std.Thread.sleep(std.time.ns_per_ms * f.retry_delay_ms);
1027-
retries_left -= 1;
1028-
continue;
1029-
}
1030-
return f.fail(f.location_tok, try eb.printString("unable to connect to server: {t}", .{err}));
1031-
};
1032-
},
1033-
.response = undefined,
1034-
.transfer_buffer = reader_buffer,
1035-
.decompress_buffer = &.{},
1036-
.decompress = undefined,
1037-
} };
1038-
const request = &resource.http_request.request;
1039-
errdefer request.deinit();
1040-
1041-
while (true) {
1042-
request.sendBodiless() catch |err| {
1043-
if (retries_left > 0) {
1044-
std.Thread.sleep(std.time.ns_per_ms * f.retry_delay_ms);
1045-
retries_left -= 1;
1046-
continue;
1047-
}
1048-
return f.fail(f.location_tok, try eb.printString("HTTP request failed: {t}", .{err}));
1049-
};
1050-
1051-
var redirect_buffer: [1024]u8 = undefined;
1052-
const response = &resource.http_request.response;
1053-
response.* = request.receiveHead(&redirect_buffer) catch |err| switch (err) {
1054-
error.ReadFailed => {
1055-
return f.fail(f.location_tok, try eb.printString("HTTP response read failure: {t}", .{
1056-
request.connection.?.getReadError().?,
1057-
}));
1058-
},
1059-
else => |e| return f.fail(f.location_tok, try eb.printString("invalid HTTP response: {t}", .{e})),
1060-
};
1061-
1062-
if (response.head.status != .ok) {
1063-
if (retries_left > 0) {
1064-
std.Thread.sleep(std.time.ns_per_ms * f.retry_delay_ms);
1065-
retries_left -= 1;
1066-
continue;
1067-
}
1068-
return f.fail(f.location_tok, try eb.printString(
1069-
"bad HTTP response code: '{d} {s}'",
1070-
.{ response.head.status, response.head.status.phrase() orelse "" },
1071-
));
1072-
}
1073-
1074-
resource.http_request.decompress_buffer = try arena.alloc(u8, response.head.content_encoding.minBufferCapacity());
1075-
return;
1076-
}
1093+
f.retry.callWithRetries(f.io, @TypeOf(initHttpResource), initHttpResource, .{ f, uri, resource, reader_buffer }) catch |err| {
1094+
const status = resource.http_request.response.head.status;
1095+
return f.fail(f.location_tok, switch (err) {
1096+
BadHttpStatus.MaybeSpurious, BadHttpStatus.NonSpurious => try eb.printString("Failed with bad HTTP status code: {d} -> {}", .{ status, status }),
1097+
else => try eb.printString("Failed to fetch HTTP resource {s}: {t}", .{ uri.path.percent_encoded, err }),
1098+
});
1099+
};
1100+
return;
10771101
}
10781102

10791103
if (ascii.eqlIgnoreCase(uri.scheme, "git+http") or
10801104
ascii.eqlIgnoreCase(uri.scheme, "git+https"))
10811105
{
1082-
var retries_left = f.retry_count;
1083-
var transport_uri = uri;
1084-
transport_uri.scheme = uri.scheme["git+".len..];
1085-
var session = while (true) {
1086-
break git.Session.init(arena, http_client, transport_uri, reader_buffer) catch |err| {
1087-
if (retries_left > 0) {
1088-
std.Thread.sleep(std.time.ns_per_ms * f.retry_delay_ms);
1089-
retries_left -= 1;
1090-
continue;
1091-
}
1092-
return f.fail(
1093-
f.location_tok,
1094-
try eb.printString("unable to discover remote git server capabilities: {t}", .{err}),
1095-
);
1096-
};
1106+
f.retry.callWithRetries(f.io, @TypeOf(initGitResource), initGitResource, .{ f, uri, resource, reader_buffer }) catch |err| {
1107+
return f.fail(f.location_tok, try eb.printString("Failed to fetch git resource {s}: {t}", .{ uri.path.percent_encoded, err }));
10971108
};
1109+
return;
1110+
}
10981111

1099-
const want_oid = want_oid: {
1100-
const want_ref =
1101-
if (uri.fragment) |fragment| try fragment.toRawMaybeAlloc(arena) else "HEAD";
1102-
if (git.Oid.parseAny(want_ref)) |oid| break :want_oid oid else |_| {}
1103-
1104-
const want_ref_head = try std.fmt.allocPrint(arena, "refs/heads/{s}", .{want_ref});
1105-
const want_ref_tag = try std.fmt.allocPrint(arena, "refs/tags/{s}", .{want_ref});
1106-
1107-
var ref_iterator: git.Session.RefIterator = undefined;
1108-
session.listRefs(&ref_iterator, .{
1109-
.ref_prefixes = &.{ want_ref, want_ref_head, want_ref_tag },
1110-
.include_peeled = true,
1111-
.buffer = reader_buffer,
1112-
}) catch |err| return f.fail(f.location_tok, try eb.printString("unable to list refs: {t}", .{err}));
1113-
defer ref_iterator.deinit();
1114-
while (ref_iterator.next() catch |err| {
1115-
return f.fail(f.location_tok, try eb.printString(
1116-
"unable to iterate refs: {s}",
1117-
.{@errorName(err)},
1118-
));
1119-
}) |ref| {
1120-
if (std.mem.eql(u8, ref.name, want_ref) or
1121-
std.mem.eql(u8, ref.name, want_ref_head) or
1122-
std.mem.eql(u8, ref.name, want_ref_tag))
1123-
{
1124-
break :want_oid ref.peeled orelse ref.oid;
1125-
}
1126-
}
1127-
return f.fail(f.location_tok, try eb.printString("ref not found: {s}", .{want_ref}));
1128-
};
1129-
if (f.use_latest_commit) {
1130-
f.latest_commit = want_oid;
1131-
} else if (uri.fragment == null) {
1132-
const notes_len = 1;
1133-
try eb.addRootErrorMessage(.{
1134-
.msg = try eb.addString("url field is missing an explicit ref"),
1135-
.src_loc = try f.srcLoc(f.location_tok),
1136-
.notes_len = notes_len,
1137-
});
1138-
const notes_start = try eb.reserveNotes(notes_len);
1139-
eb.extra.items[notes_start] = @intFromEnum(try eb.addErrorMessage(.{
1140-
.msg = try eb.printString("try .url = \"{f}#{f}\",", .{
1141-
uri.fmt(.{ .scheme = true, .authority = true, .path = true }),
1142-
want_oid,
1143-
}),
1144-
}));
1145-
return error.FetchFailed;
1146-
}
1112+
return f.fail(f.location_tok, try eb.printString("unsupported URL scheme: {s}", .{uri.scheme}));
1113+
}
11471114

1148-
var want_oid_buf: [git.Oid.max_formatted_length]u8 = undefined;
1149-
_ = std.fmt.bufPrint(&want_oid_buf, "{f}", .{want_oid}) catch unreachable;
1150-
resource.* = .{ .git = .{
1151-
.session = session,
1152-
.fetch_stream = undefined,
1153-
.want_oid = want_oid,
1154-
} };
1155-
const fetch_stream = &resource.git.fetch_stream;
1156-
while (true) {
1157-
break session.fetch(fetch_stream, &.{&want_oid_buf}, reader_buffer) catch |err| {
1158-
if (retries_left > 0) {
1159-
std.Thread.sleep(std.time.ns_per_ms * f.retry_delay_ms);
1160-
retries_left -= 1;
1161-
continue;
1162-
}
1163-
return f.fail(f.location_tok, try eb.printString("unable to create fetch stream: {t}", .{err}));
1164-
};
1165-
}
1166-
errdefer fetch_stream.deinit(fetch_stream);
1115+
fn initHttpResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) !void {
1116+
const arena = f.arena.allocator();
1117+
const http_client = f.job_queue.http_client;
11671118

1168-
return;
1119+
resource.* = .{ .http_request = .{
1120+
.request = try http_client.request(.GET, uri, .{}),
1121+
.response = undefined,
1122+
.transfer_buffer = reader_buffer,
1123+
.decompress_buffer = &.{},
1124+
.decompress = undefined,
1125+
} };
1126+
const request = &resource.http_request.request;
1127+
errdefer request.deinit();
1128+
1129+
try request.sendBodiless();
1130+
1131+
var redirect_buffer: [1024]u8 = undefined;
1132+
const response = &resource.http_request.response;
1133+
response.* = try request.receiveHead(&redirect_buffer);
1134+
const status = response.head.status;
1135+
1136+
if (@intFromEnum(status) >= 500 or status == .too_many_requests or status == .not_found) {
1137+
return BadHttpStatus.MaybeSpurious;
1138+
} else if (status != .ok) {
1139+
return BadHttpStatus.NonSpurious;
11691140
}
11701141

1171-
return f.fail(f.location_tok, try eb.printString("unsupported URL scheme: {s}", .{uri.scheme}));
1142+
resource.http_request.decompress_buffer = try arena.alloc(u8, response.head.content_encoding.minBufferCapacity());
1143+
}
1144+
1145+
fn initGitResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) !void {
1146+
const eb = &f.error_bundle;
1147+
1148+
const arena = f.arena.allocator();
1149+
const http_client = f.job_queue.http_client;
1150+
1151+
var transport_uri = uri;
1152+
transport_uri.scheme = uri.scheme["git+".len..];
1153+
var session = try git.Session.init(arena, http_client, transport_uri, reader_buffer);
1154+
1155+
const want_oid = want_oid: {
1156+
const want_ref =
1157+
if (uri.fragment) |fragment| try fragment.toRawMaybeAlloc(arena) else "HEAD";
1158+
if (git.Oid.parseAny(want_ref)) |oid| break :want_oid oid else |_| {}
1159+
1160+
const want_ref_head = try std.fmt.allocPrint(arena, "refs/heads/{s}", .{want_ref});
1161+
const want_ref_tag = try std.fmt.allocPrint(arena, "refs/tags/{s}", .{want_ref});
1162+
1163+
var ref_iterator: git.Session.RefIterator = undefined;
1164+
session.listRefs(&ref_iterator, .{
1165+
.ref_prefixes = &.{ want_ref, want_ref_head, want_ref_tag },
1166+
.include_peeled = true,
1167+
.buffer = reader_buffer,
1168+
}) catch |err| return f.fail(f.location_tok, try eb.printString("unable to list refs: {t}", .{err}));
1169+
defer ref_iterator.deinit();
1170+
while (ref_iterator.next() catch |err| {
1171+
return f.fail(f.location_tok, try eb.printString(
1172+
"unable to iterate refs: {s}",
1173+
.{@errorName(err)},
1174+
));
1175+
}) |ref| {
1176+
if (std.mem.eql(u8, ref.name, want_ref) or
1177+
std.mem.eql(u8, ref.name, want_ref_head) or
1178+
std.mem.eql(u8, ref.name, want_ref_tag))
1179+
{
1180+
break :want_oid ref.peeled orelse ref.oid;
1181+
}
1182+
}
1183+
return f.fail(f.location_tok, try eb.printString("ref not found: {s}", .{want_ref}));
1184+
};
1185+
if (f.use_latest_commit) {
1186+
f.latest_commit = want_oid;
1187+
} else if (uri.fragment == null) {
1188+
const notes_len = 1;
1189+
try eb.addRootErrorMessage(.{
1190+
.msg = try eb.addString("url field is missing an explicit ref"),
1191+
.src_loc = try f.srcLoc(f.location_tok),
1192+
.notes_len = notes_len,
1193+
});
1194+
const notes_start = try eb.reserveNotes(notes_len);
1195+
eb.extra.items[notes_start] = @intFromEnum(try eb.addErrorMessage(.{
1196+
.msg = try eb.printString("try .url = \"{f}#{f}\",", .{
1197+
uri.fmt(.{ .scheme = true, .authority = true, .path = true }),
1198+
want_oid,
1199+
}),
1200+
}));
1201+
return error.FetchFailed;
1202+
}
1203+
1204+
var want_oid_buf: [git.Oid.max_formatted_length]u8 = undefined;
1205+
_ = std.fmt.bufPrint(&want_oid_buf, "{f}", .{want_oid}) catch unreachable;
1206+
resource.* = .{ .git = .{
1207+
.session = session,
1208+
.fetch_stream = undefined,
1209+
.want_oid = want_oid,
1210+
} };
1211+
const fetch_stream = &resource.git.fetch_stream;
1212+
try session.fetch(fetch_stream, &.{&want_oid_buf}, reader_buffer);
1213+
errdefer fetch_stream.deinit(fetch_stream);
11721214
}
11731215

11741216
fn unpackResource(

0 commit comments

Comments
 (0)