Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/cli/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { buildCommand } from './CommandInterface';
import { createBucket } from './create-bucket';
import { deleteBucket } from './delete-bucket';
import { downloadFile } from './download-file';
import { downloadFileMultipart } from './download-file-multipart';
import { getDownloadLinks } from './get-download-links';
import { getFileInfo } from './get-filo-info';
import { renameFile } from './rename-file';
Expand Down Expand Up @@ -106,6 +107,15 @@ export const getDownloadLinksCommand = buildCommand({
getDownloadLinks(bucketId, fileIdsSeparatedByCommas.split(',')).finally(notifyProgramFinished('get-download-links'));
});

export const downloadFileMultipartCommand = buildCommand({
version: '0.0.1',
command: 'download-file-multipart <fileId> <fileSize> <path>',
description: 'Downloads a file by parts',
options: [],
}).action((fileId: string, fileSize: string, path: string) => {
downloadFileMultipart(fileId, parseInt(fileSize), path).finally(notifyProgramFinished('download-file-multipart'));
});

// export const downloadFolderZippedCommand = buildCommand({
// version: '0.0.1',
// name: 'download-folder-zip',
Expand Down
52 changes: 52 additions & 0 deletions src/cli/download-file-multipart.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { createWriteStream } from 'fs';
import { pipeline, Readable } from 'stream';

import { logger } from '../lib/utils/logger';
import { getEnvironment } from './CommandInterface';

export async function downloadFileMultipart(fileId: string, fileSize: number, path: string) {
logger.info('Downloading file %s', fileId);

const network = getEnvironment();
const bucketId = process.env.BUCKET_ID;

const destination = createWriteStream(path);

try {
await new Promise((resolve, reject) => {
const state = network.downloadMultipartFile(
fileId,
fileSize,
bucketId as string,
{
progressCallback: (progress: number) => {
logger.info('Progress: %s %', (progress * 100).toFixed(2));
},
finishedCallback: (err: Error | null, downloadStream: Readable | null) => {
if (err) {
return reject(err);
}

pipeline(downloadStream as Readable, destination, (err) => {
if (err) {
return reject(err);
}
resolve(null);
});
},
},
);

process.on('SIGINT', () => {
network.downloadCancel(state);
});
});
logger.info('File downloaded on path %s', path);

process.exit(0);
} catch (err) {
logger.error('Error downloading file %s', err.message);

process.exit(1);
}
}
1 change: 1 addition & 0 deletions src/cli/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ program.addCommand(commands.getFileInfoCommand);
program.addCommand(commands.createBucketCommand);
program.addCommand(commands.deleteBucketCommand);
program.addCommand(commands.getDownloadLinksCommand);
program.addCommand(commands.downloadFileMultipartCommand);
// program.addCommand(commands.downloadFolderZippedCommand);

program.parse(process.argv);
66 changes: 66 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { HashStream } from './lib/utils/streams';
import { downloadFileV2 } from './lib/core/download/downloadV2';
import { FileVersionOneError } from '@internxt/sdk/dist/network/download';
import { uploadFileMultipart, uploadFileV2 } from './lib/core/upload/uploadV2';
import { downloadFileMultipart } from './lib/core/download/multipart';

type GetBucketsCallback = (err: Error | null, result: any) => void;

Expand Down Expand Up @@ -334,6 +335,71 @@ export class Environment {
return downloadState;
};

downloadMultipartFile(
fileId: string,
fileSize: number,
bucketId: string,
opts: DownloadOptions,
) {
const abortController = new AbortController();
const downloadState = new ActionState(ActionTypes.Download);

downloadState.once(Events.Download.Abort, () => {
abortController.abort();
});

if (!this.config.encryptionKey) {
opts.finishedCallback(Error(ENCRYPTION_KEY_NOT_PROVIDED), null);

return downloadState;
}

if (!bucketId) {
opts.finishedCallback(Error(BUCKET_ID_NOT_PROVIDED), null);

return downloadState;
}

if (!fileId) {
opts.finishedCallback(Error('File id not provided'), null);

return downloadState;
}

if (!this.config.bridgeUrl) {
opts.finishedCallback(Error('Missing bridge url'), null);

return downloadState;
}

const [downloadPromise, stream] = downloadFileMultipart(
fileId,
fileSize,
bucketId,
this.config.encryptionKey,
this.config.bridgeUrl,
{
user: this.config.bridgeUser,
pass: this.config.bridgePass
},
opts.progressCallback,
() => {
opts.finishedCallback(null, stream);
},
abortController.signal,
);

downloadPromise.catch((err) => {
if (err instanceof FileVersionOneError) {
opts.finishedCallback(new Error('Invalid method for v1 files'), null)
} else {
opts.finishedCallback(err, null);
}
});

return downloadState;
}

downloadCancel(state: ActionState): void {
state.stop();
}
Expand Down
146 changes: 146 additions & 0 deletions src/lib/core/download/multipart.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { request } from 'undici';
import { createDecipheriv, randomBytes } from 'crypto';
import { Writable, Readable, PassThrough } from 'stream'
import { pipeline } from 'stream/promises'
import { validateMnemonic } from 'bip39';
import { ALGORITHMS, DecryptFileFunction, DownloadFileFunction, Network } from '@internxt/sdk/dist/network';
import { downloadFile, FileVersionOneError } from '@internxt/sdk/dist/network/download';

import { Events as ProgressEvents, HashStream, ProgressNotifier } from '../../utils/streams';
import { DownloadProgressCallback } from '.';
import { GenerateFileKey, sha256 } from '../../utils/crypto';
import Errors from './errors';

async function downloadPartStream(
downloadUrl: string,
from: number,
to: number,
signal?: AbortSignal
): Promise<Readable> {
const { statusCode, body } = await request(downloadUrl, {
signal,
method: 'GET',
headers: {
Range: `bytes=${from}-${to}`,
}
});

if (statusCode !== 206) {
throw new Error(`Error al descargar el chunk: ${statusCode}`);
}

return body;
}

export function downloadFileMultipart(
fileId: string,
fileSize: number,
bucketId: string,
mnemonic: string,
bridgeUrl: string,
creds: { pass: string, user: string },
notifyProgress: DownloadProgressCallback,
onV2Confirmed: () => void,
signal?: AbortSignal
): [Promise<void>, PassThrough] {
const partLength = 50 * 1024 * 1024;
const outStream = new PassThrough();
const network = Network.client(bridgeUrl, {
clientName: 'inxt-js',
clientVersion: '1.0'
}, {
bridgeUser: creds.user,
userId: sha256(Buffer.from(creds.pass)).toString('hex')
});

let downloadUrl = ''

const ranges: { start: number, end: number }[] = [];

for (let start = 0; start < fileSize; start += partLength) {
const end = Math.min(start + partLength - 1, fileSize - 1);
ranges.push({ start, end });
}


const downloadFileStep: DownloadFileFunction = async (downloadables) => {
onV2Confirmed();

for (const downloadable of downloadables.sort((dA, dB) => dA.index - dB.index)) {
downloadUrl = downloadable.url
}
};

const decryptFileStep: DecryptFileFunction = async (algorithm, key, iv, fileSize) => {
if (algorithm !== ALGORITHMS.AES256CTR.type) {
throw Errors.downloadUnknownAlgorithmError;
}

const decipher = createDecipheriv('aes-256-ctr', key as Buffer, iv as Buffer);
const progress = new ProgressNotifier(fileSize, 2000, { emitClose: false });

progress.on(ProgressEvents.Progress, (progress: number) => {
notifyProgress(progress, null, null);
});

const hasher = new HashStream();

const pipelinePromise = pipeline(
hasher,
decipher,
progress,
outStream,
signal ? { signal } : undefined
);

for (const range of ranges) {
const partStream = await downloadPartStream(downloadUrl, range.start, range.end, signal);
for await (const chunk of partStream) {
if (!hasher.write(chunk)) {
await new Promise((resolve) => hasher.once('drain', resolve));
}
}
}

hasher.end();

await pipelinePromise;

// TODO: Enforce this one
// const calculatedHash = hasher.getHash().toString('hex');
// const expectedHash = fileEncryptedSlice.hash;

// if (calculatedHash !== expectedHash) {
// throw Errors.downloadHashMismatchError;
// }

await new Promise((res) => progress.end(res));
};

const downloadPromise = downloadFile(
fileId,
bucketId,
mnemonic,
network,
{
validateMnemonic: (mnemonic) => {
return validateMnemonic(mnemonic);
},
algorithm: ALGORITHMS.AES256CTR,
randomBytes,
generateFileKey: (mnemonic, bucketId, index) => {
return GenerateFileKey(mnemonic, bucketId, index as Buffer | string);
}
},
Buffer.from,
downloadFileStep,
decryptFileStep
).catch((err) => {
if (err instanceof FileVersionOneError) {
throw err;
}
outStream.emit('error', err);
});

return [downloadPromise, outStream];
}