diff --git a/scripts/plugins/s3-cache.ts b/scripts/plugins/s3-cache.ts index 969d343d9..897114c47 100644 --- a/scripts/plugins/s3-cache.ts +++ b/scripts/plugins/s3-cache.ts @@ -1,15 +1,21 @@ -import { S3, S3Bucket, S3Object } from "https://deno.land/x/s3@0.5.0/mod.ts"; +import * as tar from "https://deno.land/std@0.153.0/archive/tar.ts"; import { parse } from "https://deno.land/std@0.153.0/flags/mod.ts"; -import { ensureFile } from "https://deno.land/std@0.153.0/fs/ensure_file.ts"; import { ensureDir } from "https://deno.land/std@0.153.0/fs/ensure_dir.ts"; -import { copy } from "https://deno.land/std@0.153.0/streams/conversion.ts"; -import { readerFromStreamReader } from "https://deno.land/std@0.153.0/streams/conversion.ts"; +import { ensureFile } from "https://deno.land/std@0.153.0/fs/ensure_file.ts"; import { walk } from "https://deno.land/std@0.153.0/fs/walk.ts"; -import { Buffer } from "https://deno.land/std@0.153.0/io/buffer.ts"; -import * as tar from "https://deno.land/std@0.153.0/archive/tar.ts"; +import { + copy, + readableStreamFromReader, + readerFromStreamReader, +} from "https://deno.land/std@0.153.0/streams/conversion.ts"; +import { S3, S3Bucket, S3Object } from "https://deno.land/x/s3@0.5.0/mod.ts"; +import { ClientOptions } from "https://deno.land/x/s3_lite_client@0.2.0/client.ts"; +import { ServerError } from "https://deno.land/x/s3_lite_client@0.2.0/errors.ts"; +import { S3Client } from "https://deno.land/x/s3_lite_client@0.2.0/mod.ts"; import * as transform from "https://deno.land/x/transform@v0.4.0/mod.ts"; const DEFAULT_KEEP_COUNT = 0; +const UPLOAD_PART_BYTES = 64 * 1024 * 1024; /** * CLI args * --op=backup/restore @@ -22,9 +28,7 @@ const DEFAULT_KEEP_COUNT = 0; await main(); // ---------------------implement------------------- - async function main() { - const bucket = getBucket(); const args = parse(Deno.args); if ("op" in args) { @@ -32,12 +36,15 @@ async function main() { const op = args["op"]; const key = args["key"]; const keyPrefix = args["key-prefix"]; + + const bucket = getBucket(); switch (op) { case "restore": await restoreToDir(bucket, path, key!, args["key-prefix"]); break; case "remove": await bucket.deleteObject(key!); + console.log(`deleted ${key}`) break; case "shrink": { @@ -63,7 +70,6 @@ async function main() { }); } break; - default: throw new Error(`not supported operation: ${args["op"]}`); } @@ -75,20 +81,41 @@ async function main() { function getBucket() { const bucketName = Deno.env.get("BUCKET_NAME")!; // Create a S3 instance. + const port = Number(Deno.env.get("BUCKET_PORT") || "80"); + let endpointURL = `http://${Deno.env.get("BUCKET_HOST")!}`; + if (port != 80) { + endpointURL += `:${port}`; + } + const s3 = new S3({ accessKeyID: Deno.env.get("AWS_ACCESS_KEY_ID")!, secretKey: Deno.env.get("AWS_SECRET_ACCESS_KEY")!, region: Deno.env.get("BUCKET_REGION") || "ci", - endpointURL: `http://${Deno.env.get("BUCKET_HOST")}:${ - Deno.env.get("BUCKET_PORT") - }`!, + endpointURL, }); return s3.getBucket(bucketName); } +function getBucketForBigUpload() { + const params: ClientOptions = { + endPoint: Deno.env.get("BUCKET_HOST")!, + region: Deno.env.get("BUCKET_REGION") || "ci", + accessKey: Deno.env.get("AWS_ACCESS_KEY_ID")!, + secretKey: Deno.env.get("AWS_SECRET_ACCESS_KEY")!, + bucket: Deno.env.get("BUCKET_NAME")!, + useSSL: false, + }; + + const port = Number(Deno.env.get("BUCKET_PORT") || "80"); + if (port != 80) params.port = port; + + return new S3Client(params); +} + async function save( bucket: S3Bucket, + putBucket: S3Client, path: string, key: string, filter?: string, @@ -98,17 +125,48 @@ async function save( const ret = await bucket.headObject(key); if (ret) { console.debug("object existed, skip"); - return; + return key; } - const { GzEncoder } = transform.Transformers; - const to = new tar.Tar(); - const matchRegs = filter ? [new RegExp(filter)] : undefined; + // first clean space, then push new one. + if (cleanPrefix && cleanKeepCount) { + await cleanOld(bucket, cleanPrefix, cleanKeepCount); + } const cwd = Deno.cwd(); Deno.chdir(path); + const stream = await newBackupReadableStream(filter); + + await putBucket.putObject(key, stream, { + metadata: { + contentType: "application/x-tar", + contentEncoding: "gzip", + cacheControl: "public, no-transform", + }, + partSize: UPLOAD_PART_BYTES, + }) + .catch((e: ServerError) => { + const { code, statusCode, cause, bucketName, key, resource } = e; + console.error({ code, statusCode, cause, bucketName, key, resource }); + throw e + }) + .finally(() => Deno.chdir(cwd)); + + const newRet = await bucket.headObject(key).catch((e: ServerError) => { + const { code, statusCode, cause, bucketName, key, resource } = e; + console.error({ code, statusCode, cause, bucketName, key, resource }); + console.trace(e); + throw e + }); + console.debug(`uploaded item:`); + console.debug(newRet) +} + +async function newBackupReadableStream(filter?: string) { + const { GzEncoder } = transform.Transformers; + const to = new tar.Tar(); + const matchRegs = filter ? [new RegExp(filter)] : undefined; - console.debug(`start tar at: ${Date.now()}`); for await (const entry of walk("./", { match: matchRegs })) { if (!entry.isFile) { continue; @@ -116,26 +174,9 @@ async function save( await to.append(entry.path, { filePath: entry.path }); } - console.debug(`end tar at: ${Date.now()}`); - - console.debug(`start pipeline to buffer at: ${Date.now()}`); - const reader = await transform.pipeline(to.getReader(), new GzEncoder()); - const buf = new Buffer(); - await reader.to(buf).finally(() => Deno.chdir(cwd)); - console.debug(`end pipeline to buffer at: ${Date.now()}`); - - // first clean space, then push new one. - if (cleanPrefix && cleanKeepCount) { - await cleanOld(bucket, cleanPrefix, cleanKeepCount); - } - console.debug(`start put object at: ${Date.now()}`); - await bucket.putObject(key, buf.bytes(), { - contentType: "application/x-tar", - contentEncoding: "gzip", - cacheControl: "public, no-transform", - }); - console.debug(`end put object at: ${Date.now()}`); + const reader = transform.pipeline(to.getReader(), new GzEncoder()); + return readableStreamFromReader(reader); } async function restoreToDir( @@ -157,14 +198,12 @@ async function restore( keyPrefix?: string, ) { const restoreKey = await getRestoreKey(bucket, key, keyPrefix); - console.debug(restoreKey); - if (!restoreKey) { console.log("cache missed"); return; } - console.log(`key: ${restoreKey}`); + console.log(`will restore from key: ${restoreKey}`); const ret = await bucket.getObject(restoreKey); if (!ret) { console.error(`get content failed for key: ${restoreKey}`); @@ -194,10 +233,10 @@ async function getRestoreKey( ) { const ret = await bucket.headObject(key); if (ret) { - console.debug("key hit"); + console.debug("key existed"); return key; } - console.debug("key miss"); + console.debug("key not existed"); // restore from other objects. if (!keyPrefix) { @@ -238,7 +277,7 @@ async function cleanOld( const list = await listObjectsByModifiedTime(bucket, keyPrefix); if (!list) { - console.log(`none objects founds by prefix ${keyPrefix}`); + console.log(`none objects founds by prefix: ${keyPrefix}`); return; }