Recently I was working on implementation of a Python tool that uploads hundreds of large files to AWS S3.
The tool requirements are:
- Ability to upload very large files
- Set metadata for each uploaded object if provided
- Upload a single file as a set of parts
- Retry mechanism ( if transmission of any part fails, the tool will retransmit that part without affecting other parts )
- Ability to upload object parts in parallel for multi-threaded performance
When I started to think about the design I searched on internet for some code samples and approaches that may satisfy my objective, but surprisingly there are not so much info on that clearly described how to achieve my target…
So after I successfully implemented the code for the tool and it works perfectly I decided to share that code with you. Hopefully it will save time for some of you.
Ok, so good news are that boto3
library has almost everything we need, besides progress indicator, but there we need to configure some things in order to make it work smoothly.
First I’ll put here the whole code and will explain things after that.
import boto3
import logging
from pathlib import Path
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
from hurry.filesize import size, si
import errno
import sys
import os
import threading
import ntpath
log = logging.getLogger('s3_uploader')
log.setLevel(logging.INFO)
format = logging.Formatter("%(asctime)s: - %(levelname)s: %(message)s", "%H:%M:%S")
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(format)
log.addHandler(stream_handler)
AWS_ACCESS_KEY = "XXXXXXXXXXXXX"
AWS_SECRET_ACCESS_KEY = "YYYYYYYYYYYYY"
AWS_SESSION_TOKEN = "ZZZZZZZZZZZZZ"
MP_THRESHOLD = 1
MP_CONCURRENCY = 5
MAX_RETRY_COUNT = 3
s3_client = None
class ProgressPercentage(object):
def __init__(self, filename):
self._filename = filename
# self._size = float(os.path.getsize(filename))
self._size = float(Path(filename).stat().st_size)
self._seen_so_far = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount):
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write("\r%s %s / %s (%.2f%%)" % (ntpath.basename(self._filename), size(self._seen_so_far), size(self._size), percentage))
sys.stdout.flush()
def login(session):
session = boto3.session.Session(AWS_ACCESS_KEY, AWS_ACCESS_KEY, AWS_SESSION_TOKEN)
global s3_client
s3_client = session.client('s3')
def upload_file_multipart(file, bucket, object_path, metadata=None):
log.info("Uploading [" + file + "] to [" + bucket + "] bucket ...")
log.info("S3 path: [ s3://" + bucket + "/" + object_path + " ]")
# Multipart transfers occur when the file size exceeds the value of the multipart_threshold attribute
if not Path(file).is_file:
log.error("File [" + file + "] does not exist!")
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file)
if object_path is None:
log.error("object_path is null!")
raise ValueError("S3 object must be set!")
GB = 1024 ** 3
mp_threshold = MP_THRESHOLD*GB
concurrency = MP_CONCURRENCY
transfer_config = TransferConfig(multipart_threshold=mp_threshold, use_threads=True, max_concurrency=concurrency)
login_attempt = False
retry = MAX_RETRY_COUNT
while retry > 0:
try:
s3_client.upload_file(file, bucket, object_path, Config=transfer_config, ExtraArgs=metadata, Callback=ProgressPercentage(file))
sys.stdout.write('\n')
log.info("File [" + file + "] uploaded successfully")
log.info("Object name: [" + object_path + "]")
retry = 0
except ClientError as e:
log.error("Failed to upload object!")
log.exception(e)
if e.response['Error']['Code'] == 'ExpiredToken':
log.warning('Login token expired')
retry -= 1
log.debug("retry = " + str(retry))
login_attempt = True
login()
else:
log.error("Unhandled error code:")
log.debug(e.response['Error']['Code'])
raise
except boto3.exceptions.S3UploadFailedError as e:
log.error("Failed to upload object!")
log.exception(e)
if 'ExpiredToken' in str(e):
log.warning('Login token expired')
log.info("Handling...")
retry -= 1
log.debug("retry = " + str(retry))
login_attempt = True
login()
else:
log.error("Unknown error!")
raise
except Exception as e:
log.error("Unknown exception occured!")
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(e).__name__, e.args)
log.debug(message)
log.exception(e)
raise
if login_attempt:
raise Exception("Tried to login " + str(MAX_RETRY_COUNT) + " times, but failed to upload!")
def main(args):
login()
metadata = {}
metadata['Metadata'] = {'name': 'movie.mkv', 'type': 'mkv','size': '2.32GB'}
upload_file_multipart("movie.mkv", "dev-test-files-uploader", "movie.mkv", metadata=metadata)
log.info("Upload finished!")
if __name__ == "__main__":
main(sys.argv[1:])
Now I’ll explain the code.
login()
– First we login to AWS and creating s3 client. The original solution is connecting to AWS API Gateway and then to AWS Cognito which retrieves the credentials, but here I simplified the code and pretend like we already have AWS_ACCESS_KEY
, AWS_ACCESS_KEY
and AWS_SESSION_TOKEN
so I create session from it and instantiate the S3 client.
Then I compose a metadata
. This metadata will be attached to the uploaded object on S3 and you even can see it with AWS Console.
Ok and finally we get to upload_file_multipart()
function which does all the magic. The most important thing here is transfer_config
object where we set the multipart_threshold
, use_threads
parameter and max_concurrency
value.
So, by this line:
transfer_config = TransferConfig(multipart_threshold=mp_threshold, use_threads=True, max_concurrency=concurrency)
I configure that multipart option will be switch on only for files that are larger than 1GB, also I switch on use_threads
parameter and setting maximum concurrency to 5 threads.
Now let’s look at this line:
s3_client.upload_file(file, bucket, object_path, Config=transfer_config, ExtraArgs=metadata, Callback=ProgressPercentage(file))
You can see that I pass our transfer_config
, metadata
and also setting Callback
for progress indicator.
All this logic runs in while
loop and with try-catch
, so in case if the connection will be interrupted or maybe token expired I will catch it by one of the exception handlers and will try to reconnect 3 times maximum and in case of success will automatically resubmit the upload.
When you run the code and will provide relatively large file (mine is about 2GB) you will see the percentage indicator in real time while uploading the file:
20:28:21: - INFO: Uploading [movie.mkv] to [dev-test-files-uploader] bucket...
20:28:21: - INFO: S3 path: [ s3://dev-test-files-uploader/movie.mkv ]
movie.mkv 59M / 2G (2.67%)
Ok, this is actually it. I hope this post will help to someone who need to implement something similar.
If you have any questions please write them in the comments section.
[…] Python S3 Multipart File Upload with Metadata and Progress Indicator […]
So how does this do retry of only the failed part ? From what I see in case of failure it tries to upload the whole thing from scratch.
You are right, the logic of retry mechanism of only failed parts have to be implemented by yourself, boto3 doesn’t handle this.