Today I’ll show how to use Pandas DataFrame for transforming data from one MySQL AWS RDS to another.
Suppose situation when you have some old application that writes data to some MySQL table and at some point you need to upgrade the application and the data will be slightly different from the old one.
So once you finished development you need to migrate the old data to the new database.
To simplify the task, suppose that both databases have only one table with slightly different columns.
In general the table reflects files status on AWS S3 with some additional information.
Old DB table columns:
created | updated | project_id | bucket_name | file_name | md5sum | uploader | organism | variety | type | insert | read_length | adapter_seq
New DB table columns:
created | updated | object | bucket | md5sum | uploader | size | metadata
As you can see some columns are completely the same, some others have been renamed (bucket_name
-> bucket
, file_name
->object
).
Besides that on the new database we have a new JSON type column ‘metadata
‘. We want aggregate project_id, organism, variety, type, insert, read_length, adapter_seq
to JSON and insert it into metadata
column on the new table.
Another point – on the new table we want to record the size of each file and we want to check that the old data is correct, maybe some files have been deleted from S3 and this wasn’t reflected in the database for some reason. So we want to delete rows with non-existing objects.
This manipulation might be quite painstaking if you decided to do it by just reading one table, put data into objects, perform manipulation on objects and then use them for the SQL script to finally insert into the new table.
Instead we can use Pandas DataFrame and perform this operation by writing 5-6 lines of code!
First, let’s see whole the code and then I’ll explain everything:
import pymysql import pandas as pd import numpy as np from sqlalchemy import create_engine import boto3 OLD_DB_HOST = "old_db_host.us-east-1.rds.amazonaws.com" OLD_DB_PORT = 3306 OLD_DB_INSTANCE_IDENTIFIER = "s3data" OLD_DB_USER = "user" OLD_DB_PASSWORD = "passw" NEW_DB_HOST = "new_db_host.us-east-1.rds.amazonaws.com" NEW_DB_PORT = 3306 NEW_DB_INSTANCE_IDENTIFIER = "s3data" NEW_DB_USER = "user" NEW_DB_PASSWORD = "passw" def get_list_of_objects(bucket): session = boto3.session.Session(profile_name='dev') conn = boto3.client('s3') paginator = conn.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=bucket) existing_objects = [] for page in pages: for obj in page['Contents']: existing_objects.append((obj['Key'], obj['Size'])) return existing_objects def migrate(): print("\n*********** Loading data from OLD DB ***********\n") print("Connecting to [" + OLD_DB_HOST + "] ...") conn = pymysql.connect(OLD_DB_HOST, OLD_DB_USER, OLD_DB_PASSWORD, OLD_DB_INSTANCE_IDENTIFIER) print("Fetching data to DataFrame ...") df = pd.read_sql_query("select * from my_old_table", conn) print("Closing connection ...") conn.close() print("\n*********** Transforming data ***********\n") print("Renaming columns ...") df.rename(columns={'bucket_name': 'bucket', 'file_name': 'object'}, inplace=True) print("Adding new column ...") df = df.assign(size=np.nan) print("Reordering columns ...") df = df[['created', 'updated', 'object', 'bucket', 'md5sum', 'uploader', 'size', 'project_id', 'organism', 'variety', 'type', 'insert', 'read_length', 'adapter_seq']] print("Agregating columns ...") df = df[['created', 'updated', 'object', 'bucket', 'md5sum', 'uploader', 'size']].assign(metadata=df.iloc[:, 7:].agg(pd.Series.to_json, axis=1)) print("\n*********** S3 sync and slice ***********\n") existing_objects = get_list_of_objects('my_bucket_name') df.drop(df[~df.object.isin([obj[0] for obj in existing_objects])].index, inplace=True) df['size'] = df['object'].map(dict(existing_objects)) print("\n*********** Migrating data ***********\n") print("Connecting to [" + NEW_DB_HOST + "] ...") engine = create_engine('mysql://'+NEW_DB_USER+':'+NEW_DB_PASSWORD+'@'+NEW_DB_HOST+'/'+NEW_DB_INSTANCE_IDENTIFIER+'?charset=utf8') print("Migrating data ...") df.to_sql('my_new_table', engine, if_exists='append', index=False) print("Closing connection ...") engine.dispose() print("\n*********** Migration done ***********\n") migrate()
So what we have here?
First, we loading all the data from old RDS to Pandas DataFrame (line: 44)
Now when we have whole the table inside our DataFrame we want to rename the similar columns (line: 50).
We adding a new column size and assigning to it empty values (line: 52).
Then we reordering columns in such way that at the beginning we have the same columns on both tables (line: 54).
Now all the columns starting from project_id
to adapter_seq
we need to transform to JSON and put it in new created column metadata
(line: 58). So the content of metadata
column will be something like this:
{ "project_id" = "proj1", "organism" = "dd3d", "variety" = "d5", "type" = "type3", "insert" = True, "read_length" = 1562, "adapter_seq" = "fj83j" }
Now we want to drop all the rows that contain phantom records (objects that are not present on S3) (line: 64). To achieve this we collecting all the existing object names and their sizes and storing this in a list of tuples where the first element is object name and the second is object size (existing_objects
). I use get_paginator(
'list_objects_v2'
)
(line: 26), because regular api will return only first 1000 records. My bucket contains about 11k objects.
Next step I need to inject the size for each S3 object. This can be done just by one line of code: df['size'] = df['object'].map(dict(existing_objects))
(line: 66).
Now when we have our DataFrame adjusted to the new table structure we can finally save it to the database (line: 72).
This is it! Hopefully this post will save time to some of you that not familiar with Pandas.
Happy coding!