Python S3 Multipart File Upload with Metadata and Progress Indicator

Recently I was working on implementation of a Python tool that uploads hundreds of large files to AWS S3. 

The tool requirements are:

  • Ability to upload very large files
  • Set metadata for each uploaded object if provided
  • Upload a single file as a set of parts
  • Retry mechanism ( if transmission of any part fails, the tool will retransmit that part without affecting other parts )
  • Ability to upload object parts in parallel for multi-threaded performance

When I started to think about the design I searched on internet for some code samples and approaches that may satisfy my objective, but surprisingly there are not so much info on that clearly described how to achieve my target…

So after I successfully implemented the code for the tool and it works perfectly I decided to share that code with you. Hopefully it will save time for some of you.

Ok, so good news are that boto3 library has almost everything we need, besides progress indicator, but there we need to configure some things in order to make it work smoothly.

First I’ll put here the whole code and will explain things after that.

import boto3
import logging
from pathlib import Path
from botocore.exceptions import ClientError
from boto3.s3.transfer import TransferConfig
from hurry.filesize import size, si
import errno
import sys
import os
import threading
import ntpath


log = logging.getLogger('s3_uploader')
log.setLevel(logging.INFO)
format = logging.Formatter("%(asctime)s: - %(levelname)s: %(message)s", "%H:%M:%S")
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(format)
log.addHandler(stream_handler)

AWS_ACCESS_KEY = "XXXXXXXXXXXXX"
AWS_SECRET_ACCESS_KEY = "YYYYYYYYYYYYY"
AWS_SESSION_TOKEN = "ZZZZZZZZZZZZZ"

MP_THRESHOLD = 1
MP_CONCURRENCY = 5
MAX_RETRY_COUNT = 3

s3_client = None


class ProgressPercentage(object):
    def __init__(self, filename):
        self._filename = filename
        # self._size = float(os.path.getsize(filename))
        self._size = float(Path(filename).stat().st_size)
        self._seen_so_far = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount):
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write("\r%s  %s / %s  (%.2f%%)" % (ntpath.basename(self._filename), size(self._seen_so_far), size(self._size), percentage))
            sys.stdout.flush()


def login(session):
    session = boto3.session.Session(AWS_ACCESS_KEY, AWS_ACCESS_KEY, AWS_SESSION_TOKEN)
    global s3_client
    s3_client = session.client('s3')


def upload_file_multipart(file, bucket, object_path, metadata=None):
    log.info("Uploading [" + file + "] to [" + bucket + "] bucket ...")
    log.info("S3 path: [ s3://" + bucket + "/" + object_path + " ]")
    # Multipart transfers occur when the file size exceeds the value of the multipart_threshold attribute
    if not Path(file).is_file:
        log.error("File [" + file + "] does not exist!")
        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), file)

    if object_path is None:
        log.error("object_path is null!")
        raise ValueError("S3 object must be set!")

    GB = 1024 ** 3
    mp_threshold = MP_THRESHOLD*GB
    concurrency = MP_CONCURRENCY
    transfer_config = TransferConfig(multipart_threshold=mp_threshold, use_threads=True, max_concurrency=concurrency)

    login_attempt = False
    retry = MAX_RETRY_COUNT
    
    while retry > 0:
        try:
            s3_client.upload_file(file, bucket, object_path, Config=transfer_config, ExtraArgs=metadata, Callback=ProgressPercentage(file))
            sys.stdout.write('\n')
            log.info("File [" + file + "] uploaded successfully")
            log.info("Object name: [" + object_path + "]")
            retry = 0
        
        except ClientError as e:
            log.error("Failed to upload object!")
            log.exception(e)
            if e.response['Error']['Code'] == 'ExpiredToken':
                log.warning('Login token expired')
                retry -= 1
                log.debug("retry = " + str(retry))
                login_attempt = True
                login()
            else:
                log.error("Unhandled error code:")
                log.debug(e.response['Error']['Code'])
                raise

        except boto3.exceptions.S3UploadFailedError as e:
            log.error("Failed to upload object!")
            log.exception(e)
            if 'ExpiredToken' in str(e):
                log.warning('Login token expired')
                log.info("Handling...")
                retry -= 1
                log.debug("retry = " + str(retry))
                login_attempt = True
                login()
            else:
                log.error("Unknown error!")
                raise

        except Exception as e:
            log.error("Unknown exception occured!")
            template = "An exception of type {0} occurred. Arguments:\n{1!r}"
            message = template.format(type(e).__name__, e.args)
            log.debug(message)
            log.exception(e)
            raise

    if login_attempt:
        raise Exception("Tried to login " + str(MAX_RETRY_COUNT) + " times, but failed to upload!")

def main(args):
    login()
    metadata = {}
    metadata['Metadata'] = {'name': 'movie.mkv', 'type': 'mkv','size': '2.32GB'}
    upload_file_multipart("movie.mkv", "dev-test-files-uploader", "movie.mkv", metadata=metadata)
    log.info("Upload finished!")


if __name__ == "__main__":
    main(sys.argv[1:])

Now I’ll explain the code. 

login() – First we login to AWS and creating s3 client. The original solution is connecting to AWS API Gateway and then to AWS Cognito which retrieves the credentials, but here I simplified the code and pretend like we already have AWS_ACCESS_KEY, AWS_ACCESS_KEY and AWS_SESSION_TOKEN so I create session from it and instantiate the S3 client.

Then I compose a metadata. This metadata will be attached to the uploaded object on S3 and you even can see it with AWS Console.

Ok and finally we get to upload_file_multipart() function which does all the magic. The most important thing here is transfer_config object where we set the multipart_threshold, use_threads parameter and max_concurrency value.

So, by this line:

transfer_config = TransferConfig(multipart_threshold=mp_threshold, use_threads=True, max_concurrency=concurrency)

I configure that multipart option will be switch on only for files that are larger than 1GB, also I switch on use_threads parameter and setting maximum concurrency to 5 threads.

Now let’s look at this line:

s3_client.upload_file(file, bucket, object_path, Config=transfer_config, ExtraArgs=metadata, Callback=ProgressPercentage(file))

You can see that I pass our transfer_config, metadata and also setting Callback for progress indicator.

All this logic runs in while loop and with try-catch, so in case if the connection will be interrupted or maybe token expired I will catch it by one of the exception handlers and will try to reconnect 3 times maximum and in case of success will automatically resubmit the upload.

When you run the code and will provide relatively large file (mine is about 2GB) you will see the percentage indicator in real time while uploading the file: 

20:28:21: - INFO: Uploading [movie.mkv] to [dev-test-files-uploader] bucket...
20:28:21: - INFO: S3 path: [ s3://dev-test-files-uploader/movie.mkv ]
movie.mkv 59M / 2G (2.67%)

Ok, this is actually it. I hope this post will help to someone who need to implement something similar.

If you have any questions please write them in the comments section.

3 COMMENTS

  1. So how does this do retry of only the failed part ? From what I see in case of failure it tries to upload the whole thing from scratch.

    • You are right, the logic of retry mechanism of only failed parts have to be implemented by yourself, boto3 doesn’t handle this.

LEAVE A REPLY

Please enter your comment!
Please enter your name here

This site uses Akismet to reduce spam. Learn how your comment data is processed.