Skip to content

Commit

Permalink
Merge pull request #2 from humanlongevity/multithreaded_s3_put
Browse files Browse the repository at this point in the history
Enable parallel s3 put_multipart
  • Loading branch information
jpiper committed Dec 25, 2015
2 parents 9899975 + 7cd6c6c commit 2ec6b4a
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions luigi/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import logging
import os
import os.path
from multiprocessing.pool import ThreadPool
from multiprocessing.pool import ThreadPool, Pool, cpu_count

try:
from urlparse import urlsplit
Expand Down Expand Up @@ -62,6 +62,17 @@
S3_DIRECTORY_MARKER_SUFFIX_1 = '/'


# This needs to be a function and not a method as methods aren't picklable and can't be multithreaded
def _put_part(destination_s3_path, i, local_path, mp, num_parts, part_size, source_size):
offset = part_size * i
bytes = min(part_size, source_size - offset)
with open(local_path, 'rb') as fp:
part_num = i + 1
logger.info('Uploading part %s/%s to %s', part_num, num_parts, destination_s3_path)
fp.seek(offset)
mp.upload_part_from_file(fp, part_num=part_num, size=bytes)


class InvalidDeleteException(FileSystemException):
pass

Expand Down Expand Up @@ -198,14 +209,15 @@ def put_string(self, content, destination_s3_path, **kwargs):
s3_key.key = key
s3_key.set_contents_from_string(content, **kwargs)

def put_multipart(self, local_path, destination_s3_path, part_size=67108864, **kwargs):
def put_multipart(self, local_path, destination_s3_path, part_size=67108864, processes=cpu_count(), **kwargs):
"""
Put an object stored locally to an S3 path
using S3 multi-part upload (for files > 5GB).
Put an object stored locally to an S3 path using S3 multi-part upload (for files > 5GB).
:param local_path: Path to source local file
:param destination_s3_path: URL for target S3 location
:param part_size: Part size in bytes. Default: 67108864 (64MB), must be >= 5MB and <= 5 GB.
:param processes: Number of processes to use to upload file to S3. Default: multiprocessing.cpu_count()
:param kwargs: Keyword arguments are passed to the boto function `initiate_multipart_upload`
"""
# calculate number of parts to upload
Expand All @@ -225,22 +237,23 @@ def put_multipart(self, local_path, destination_s3_path, part_size=67108864, **k
# use modulo to avoid float precision issues
# for exactly-sized fits
num_parts = (source_size + part_size - 1) // part_size

pool = Pool(processes)
mp = None
results = []
try:
mp = s3_bucket.initiate_multipart_upload(key, **kwargs)

for i in range(num_parts):
# upload a part at a time to S3
offset = part_size * i
bytes = min(part_size, source_size - offset)
with open(local_path, 'rb') as fp:
part_num = i + 1
logger.info('Uploading part %s/%s to %s', part_num, num_parts, destination_s3_path)
fp.seek(offset)
mp.upload_part_from_file(fp, part_num=part_num, size=bytes)

results.append(pool.apply_async(_put_part, (destination_s3_path, i, local_path, mp,
num_parts, part_size, source_size)))
# finish the upload, making the file available in S3
logger.info('Waiting for upload of %s to finish', destination_s3_path)
pool.close()
# Iterate through the results to ensure no exceptions are raised in child processes
for r in results:
r.get()
pool.join()
mp.complete_upload()
except BaseException:
if mp:
Expand Down

0 comments on commit 2ec6b4a

Please sign in to comment.