Skip to content

Commit bffbdcf

Browse files
authored
Merge pull request #14390 from NixOS/constant-memory-uploads
libstore: Make HTTP binary cache uploads run in constant memory
2 parents 495d1b8 + cf75079 commit bffbdcf

File tree

11 files changed

+144
-62
lines changed

11 files changed

+144
-62
lines changed

src/libfetchers/git-lfs-fetch.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,9 @@ std::vector<nlohmann::json> Fetch::fetchUrls(const std::vector<Pointer> & pointe
219219
nlohmann::json oidList = pointerToPayload(pointers);
220220
nlohmann::json data = {{"operation", "download"}};
221221
data["objects"] = oidList;
222-
request.data = data.dump();
222+
auto payload = data.dump();
223+
StringSource source{payload};
224+
request.data = {source};
223225

224226
FileTransferResult result = getFileTransfer()->upload(request);
225227
auto responseString = result.data;

src/libstore/binary-cache-store.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ std::optional<std::string> BinaryCacheStore::getNixCacheInfo()
7979
void BinaryCacheStore::upsertFile(
8080
const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint)
8181
{
82-
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType, sizeHint);
82+
auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique<StringSource>(data); });
83+
upsertFile(path, *source, mimeType, sizeHint);
8384
}
8485

8586
void BinaryCacheStore::getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept
@@ -271,12 +272,19 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
271272

272273
/* Atomically write the NAR file. */
273274
if (repair || !fileExists(narInfo->url)) {
275+
auto source = restartableSourceFromFactory([fnTemp]() {
276+
struct AutoCloseFDSource : AutoCloseFD, FdSource
277+
{
278+
AutoCloseFDSource(AutoCloseFD fd)
279+
: AutoCloseFD(std::move(fd))
280+
, FdSource(get())
281+
{
282+
}
283+
};
284+
return std::make_unique<AutoCloseFDSource>(toDescriptor(open(fnTemp.c_str(), O_RDONLY)));
285+
});
274286
stats.narWrite++;
275-
upsertFile(
276-
narInfo->url,
277-
std::make_shared<std::fstream>(fnTemp, std::ios_base::in | std::ios_base::binary),
278-
"application/x-nix-nar",
279-
narInfo->fileSize);
287+
upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize);
280288
} else
281289
stats.narWriteAverted++;
282290

src/libstore/filetransfer.cc

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer
295295
return 0;
296296
}
297297

298-
size_t readOffset = 0;
299-
300-
size_t readCallback(char * buffer, size_t size, size_t nitems)
301-
{
302-
if (readOffset == request.data->length())
303-
return 0;
304-
auto count = std::min(size * nitems, request.data->length() - readOffset);
305-
assert(count);
306-
memcpy(buffer, request.data->data() + readOffset, count);
307-
readOffset += count;
308-
return count;
298+
size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept
299+
try {
300+
auto data = request.data;
301+
return data->source->read(buffer, nitems * size);
302+
} catch (EndOfFile &) {
303+
return 0;
304+
} catch (...) {
305+
return CURL_READFUNC_ABORT;
309306
}
310307

311-
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp)
308+
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept
312309
{
313310
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
314311
}
@@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer
322319
}
323320
#endif
324321

325-
size_t seekCallback(curl_off_t offset, int origin)
326-
{
322+
size_t seekCallback(curl_off_t offset, int origin) noexcept
323+
try {
324+
auto source = request.data->source;
327325
if (origin == SEEK_SET) {
328-
readOffset = offset;
326+
source->restart();
327+
source->skip(offset);
329328
} else if (origin == SEEK_CUR) {
330-
readOffset += offset;
329+
source->skip(offset);
331330
} else if (origin == SEEK_END) {
332-
readOffset = request.data->length() + offset;
331+
NullSink sink{};
332+
source->drainInto(sink);
333333
}
334334
return CURL_SEEKFUNC_OK;
335+
} catch (...) {
336+
return CURL_SEEKFUNC_FAIL;
335337
}
336338

337-
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin)
339+
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept
338340
{
339341
return ((TransferItem *) clientp)->seekCallback(offset, origin);
340342
}
@@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer
393395
if (request.data) {
394396
if (request.method == HttpMethod::POST) {
395397
curl_easy_setopt(req, CURLOPT_POST, 1L);
396-
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length());
398+
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint);
397399
} else if (request.method == HttpMethod::PUT) {
398400
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
399-
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
401+
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint);
400402
} else {
401403
unreachable();
402404
}

src/libstore/http-binary-cache-store.cc

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,23 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path)
135135
}
136136

137137
void HttpBinaryCacheStore::upsertFile(
138-
const std::string & path,
139-
std::shared_ptr<std::basic_iostream<char>> istream,
140-
const std::string & mimeType,
141-
uint64_t sizeHint)
138+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
142139
{
143140
auto req = makeRequest(path);
144141
req.method = HttpMethod::PUT;
145-
auto data = StreamToSourceAdapter(istream).drain();
146-
147142
auto compressionMethod = getCompressionMethod(path);
148143

144+
std::string data;
145+
std::optional<StringSource> stringSource{};
146+
149147
if (compressionMethod) {
150-
data = compress(*compressionMethod, data);
148+
StringSink sink{};
149+
auto compressionSink = makeCompressionSink(*compressionMethod, sink);
150+
source.drainInto(*compressionSink);
151+
compressionSink->finish();
152+
data = std::move(sink.s);
151153
req.headers.emplace_back("Content-Encoding", *compressionMethod);
154+
stringSource = StringSource{data};
155+
req.data = {*stringSource};
156+
} else {
157+
req.data = {sizeHint, source};
152158
}
153159

154-
req.data = std::move(data);
155160
req.mimeType = mimeType;
156161

157162
try {

src/libstore/include/nix/store/binary-cache-store.hh

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,7 @@ public:
101101
virtual bool fileExists(const std::string & path) = 0;
102102

103103
virtual void upsertFile(
104-
const std::string & path,
105-
std::shared_ptr<std::basic_iostream<char>> istream,
106-
const std::string & mimeType,
107-
uint64_t sizeHint) = 0;
104+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0;
108105

109106
void upsertFile(
110107
const std::string & path,

src/libstore/include/nix/store/filetransfer.hh

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,26 @@ struct FileTransferRequest
115115
unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT;
116116
ActivityId parentAct;
117117
bool decompress = true;
118-
std::optional<std::string> data;
118+
119+
struct UploadData
120+
{
121+
UploadData(StringSource & s)
122+
: sizeHint(s.s.length())
123+
, source(&s)
124+
{
125+
}
126+
127+
UploadData(std::size_t sizeHint, RestartableSource & source)
128+
: sizeHint(sizeHint)
129+
, source(&source)
130+
{
131+
}
132+
133+
std::size_t sizeHint = 0;
134+
RestartableSource * source = nullptr;
135+
};
136+
137+
std::optional<UploadData> data;
119138
std::string mimeType;
120139
std::function<void(std::string_view data)> dataCallback;
121140
/**

src/libstore/include/nix/store/http-binary-cache-store.hh

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,7 @@ protected:
8181
bool fileExists(const std::string & path) override;
8282

8383
void upsertFile(
84-
const std::string & path,
85-
std::shared_ptr<std::basic_iostream<char>> istream,
86-
const std::string & mimeType,
87-
uint64_t sizeHint) override;
84+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
8885

8986
FileTransferRequest makeRequest(std::string_view path);
9087

src/libstore/local-binary-cache-store.cc

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,12 @@ struct LocalBinaryCacheStore : virtual BinaryCacheStore
5454
bool fileExists(const std::string & path) override;
5555

5656
void upsertFile(
57-
const std::string & path,
58-
std::shared_ptr<std::basic_iostream<char>> istream,
59-
const std::string & mimeType,
60-
uint64_t sizeHint) override
57+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override
6158
{
6259
auto path2 = config->binaryCacheDir + "/" + path;
6360
static std::atomic<int> counter{0};
6461
Path tmp = fmt("%s.tmp.%d.%d", path2, getpid(), ++counter);
6562
AutoDelete del(tmp, false);
66-
StreamToSourceAdapter source(istream);
6763
writeFile(tmp, source);
6864
std::filesystem::rename(tmp, path2);
6965
del.cancel();

src/libstore/s3-binary-cache-store.cc

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
2121
}
2222

2323
void upsertFile(
24-
const std::string & path,
25-
std::shared_ptr<std::basic_iostream<char>> istream,
26-
const std::string & mimeType,
27-
uint64_t sizeHint) override;
24+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
2825

2926
private:
3027
ref<S3BinaryCacheStoreConfig> s3Config;
@@ -70,12 +67,9 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
7067
};
7168

7269
void S3BinaryCacheStore::upsertFile(
73-
const std::string & path,
74-
std::shared_ptr<std::basic_iostream<char>> istream,
75-
const std::string & mimeType,
76-
uint64_t sizeHint)
70+
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
7771
{
78-
HttpBinaryCacheStore::upsertFile(path, istream, mimeType, sizeHint);
72+
HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint);
7973
}
8074

8175
std::string S3BinaryCacheStore::createMultipartUpload(
@@ -92,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload(
9286
req.uri = VerbatimURL(url);
9387

9488
req.method = HttpMethod::POST;
95-
req.data = "";
89+
StringSource payload{std::string_view("")};
90+
req.data = {payload};
9691
req.mimeType = mimeType;
9792

9893
if (contentEncoding) {
@@ -122,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId,
122117
url.query["partNumber"] = std::to_string(partNumber);
123118
url.query["uploadId"] = uploadId;
124119
req.uri = VerbatimURL(url);
125-
req.data = std::move(data);
120+
StringSource payload{data};
121+
req.data = {payload};
126122
req.mimeType = "application/octet-stream";
127123

128124
auto result = getFileTransfer()->enqueueFileTransfer(req).get();
@@ -169,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload(
169165

170166
debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml);
171167

172-
req.data = xml;
168+
StringSource payload{xml};
169+
req.data = {payload};
173170
req.mimeType = "text/xml";
174171

175172
getFileTransfer()->enqueueFileTransfer(req).get();

src/libutil/include/nix/util/serialise.hh

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,18 @@ struct StringSink : Sink
230230
void operator()(std::string_view data) override;
231231
};
232232

233+
/**
234+
* Source type that can be restarted.
235+
*/
236+
struct RestartableSource : Source
237+
{
238+
virtual void restart() = 0;
239+
};
240+
233241
/**
234242
* A source that reads data from a string.
235243
*/
236-
struct StringSource : Source
244+
struct StringSource : RestartableSource
237245
{
238246
std::string_view s;
239247
size_t pos;
@@ -257,8 +265,22 @@ struct StringSource : Source
257265
size_t read(char * data, size_t len) override;
258266

259267
void skip(size_t len) override;
268+
269+
void restart() override
270+
{
271+
pos = 0;
272+
}
260273
};
261274

275+
/**
276+
* Create a restartable Source from a factory function.
277+
*
278+
* @param factory Factory function that returns a fresh instance of the Source. Gets
279+
* called for each source restart.
280+
* @pre factory must return an equivalent source for each invocation.
281+
*/
282+
std::unique_ptr<RestartableSource> restartableSourceFromFactory(std::function<std::unique_ptr<Source>()> factory);
283+
262284
/**
263285
* A sink that writes all incoming data to two other sinks.
264286
*/

0 commit comments

Comments
 (0)