Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/venv
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Positional arguments:
- S3_BUCKET - Name of a bucket.
- S3_VISIBILITY - Either private or public visibility. Available values: private/public-read.
- S3_PATH - Path to store file at.
- SOURCE_FILE_PATH - Path to file to upload.
- SOURCE_FILE_PATH - Path to file or directory to upload.

Optional arguments:
- --s3_root - Root s3 bucket directory.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
boto3
tqdm
134 changes: 89 additions & 45 deletions uploader.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,116 @@
from os import path
import boto3
import argparse
import os
import mimetypes
from pathlib import Path

import boto3
from tqdm import tqdm

BASE_DIR = Path(__file__).resolve().parent
ALLOWED_VISIBILITIES = ['private', 'public-read']

client = None

def path_object(value):
abspath = BASE_DIR.joinpath(value)
if abspath.exists():
return abspath
else:
raise argparse.ArgumentTypeError(f'Unable to resolve path `{abspath}`. Path argument must be a valid path.')

def init_arguments(args_parser=None):
if not args_parser:
return

args_parser.add_argument('s3_access_key', help='S3 storage access key.', type=str)
args_parser.add_argument('s3_secret', help='S3 storage secret key.', type=str)
args_parser.add_argument('s3_endpoint', help='Endpoint of s3 storage.', type=str)
args_parser.add_argument('s3_region', help='Region of s3 storage.', type=str)
args_parser.add_argument('s3_bucket', help='Name of a bucket.', type=str)
args_parser.add_argument(
's3_visibility',
help='Either private or public visibility. Available values: {}.'.format('/'.join(ALLOWED_VISIBILITIES)),
type=str
)
args_parser.add_argument('s3_path', help='Path to store file at.', type=str)
args_parser.add_argument('path', help='Path to file to upload.', type=str)
def parse_args():
args_parser = argparse.ArgumentParser()
args_parser.add_argument('--s3_access_key', help='S3 storage access key.', required=True)
args_parser.add_argument('--s3_secret', help='S3 storage secret key.', required=True)
args_parser.add_argument('--s3_endpoint', help='Endpoint of s3 storage.', required=True)
args_parser.add_argument('--s3_region', help='Region of s3 storage.', required=True)
args_parser.add_argument('--s3_bucket', help='Name of a bucket.', required=True)
args_parser.add_argument('--s3_path', help='Path to store file at.', required=True)
args_parser.add_argument('--path', help='Path to file or directory to upload.', required=True, type=path_object)
args_parser.add_argument('--s3_root', help='Prefix (Root s3 bucket directory).', required=False, default=None)
args_parser.add_argument('--s3_visibility',
help=f"File visibility level. Can be one of: {'/'.join(ALLOWED_VISIBILITIES)}.",
choices=ALLOWED_VISIBILITIES,
default=ALLOWED_VISIBILITIES[0]
)

args_parser.add_argument('--s3_root', help='Root s3 bucket directory.', type=str, required=False)
return args_parser.parse_args()


def get_s3_client(args):
client = boto3.client(
's3',
region_name=args.s3_region,
endpoint_url=args.s3_endpoint,
aws_access_key_id=args.s3_access_key,
aws_secret_access_key=args.s3_secret
)
return client


def upload_file(input_arguments):
with open(input_arguments.path) as file:
store_at = input_arguments.s3_path.strip('/')
client = get_s3_client(input_arguments)
bytes_total = os.stat(input_arguments.path).st_size
bytes_transferred = 0

with tqdm(total=bytes_total, unit='B', unit_scale=True, unit_divisor=1024) as progress_bar:
prefix = input_arguments.s3_root
store_at = input_arguments.s3_path.strip('/')
if prefix:
store_at = "{}/{}".format(prefix, store_at)
store_at = f"{prefix}/{store_at}"

store_at = "{}/{}".format(store_at, path.basename(input_arguments.path))
store_at = f"{store_at}/{os.path.basename(input_arguments.path)}"
mimetype, _ = mimetypes.guess_type(input_arguments.path)

client.put_object(
if mimetype is None:
raise Exception(f"Failed to guess mimetype of `{input_arguments.path}`")

def bytes_count(size):
nonlocal bytes_transferred, progress_bar
bytes_transferred += size
progress_bar.update(bytes_transferred)

client.upload_file(
Filename=str(input_arguments.path),
Bucket=input_arguments.s3_bucket,
Key=store_at,
ACL=input_arguments.s3_visibility,
Body=file.read(),
Callback=bytes_count,
ExtraArgs={'ACL': input_arguments.s3_visibility, 'ContentType': mimetype}
)


if __name__ == '__main__':
parser = argparse.ArgumentParser()
init_arguments(args_parser=parser)
def upload_dir(input_arguments):
client = get_s3_client(input_arguments)
prefix = input_arguments.s3_root
store_at = input_arguments.s3_path.strip('/')
if prefix:
store_at = f"{prefix}/{store_at}"

args = parser.parse_args()
files = [path for path in input_arguments.path.rglob('*') if path.is_file()]

if not path.isfile(args.path):
print('Path argument must be a valid path to a file.')
exit(0)
for filepath in tqdm(files, unit='files'):
key = f"{store_at}/{filepath.relative_to(input_arguments.path).as_posix()}"
mimetype, _ = mimetypes.guess_type(filepath)

if args.s3_visibility not in ALLOWED_VISIBILITIES:
print('Visibility should be one of: {}.'.format(','.join(ALLOWED_VISIBILITIES)))
exit(0)
if mimetype is None:
raise Exception(f"Failed to guess mimetype of `{filepath}`")

client = boto3.client(
's3',
region_name=args.s3_region,
endpoint_url=args.s3_endpoint,
aws_access_key_id=args.s3_access_key,
aws_secret_access_key=args.s3_secret
)
client.upload_file(
Filename=str(filepath),
Bucket=input_arguments.s3_bucket,
Key=key,
ExtraArgs={'ACL': input_arguments.s3_visibility, 'ContentType': mimetype}
)

upload_file(args)

print('Successful file "{}" upload.'.format(path.basename(args.path)))
def main():
args = parse_args()
if args.path.is_dir():
upload_dir(args)
else:
upload_file(args)
print('Done.')


if __name__ == '__main__':
main()