Migrate MySQL Table Data to MongoDB Collections With Python
Get your MySQL data where it needs to go.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
MySQL is an RDBMS platform that stores data in a tabular format in a normalized manner, whereas, MongoDB is a NoSQL database that stores information in a schema-less manner as documents that are grouped under collections. The representation of data is completely different and hence migration of MySQL table data to MongoDB collections might sound like a mammoth task. Fortunately, Python makes it a breeze with its strong connectivity and data handling capabilities.
In this article, I will walk you through the steps required to migrate your MySQL table data to MongoDB collections using a simple Python script. The scripts were developed using Python 3.9.5 on Windows. However, it should work with any Python 3+ version on any platform.
Step 1: Install the Required Modules
The first step is to install the modules required to connect to your MySQL and MongoDB database instances. We will use mysql.connector, the official module to connect to MySQL database. For MongoDB, we will use pymongo, which is the recommended module to connect to MongoDB from Python.
Run the following PIP commands to install the required modules, if they're not already installed.
pip install mysql-connector pip install pymongo
PIP is a package manager for Python packages, or modules.
Step 2: Read data from MySQL table
The first step is to read data from the source MySQL table and prepare it in a format that can be used to load the data into the target MongoDB database. MongoDB is a NoSQL database that stores data as JSON documents, It is therefore a good idea to generate the source data in JSON format. Fortunately, Python has strong data handling capabilities which makes it easy to convert the data to JSON format.
import mysql.connector
mysqldb = mysql.connector.connect( host="localhost", database="employees", user="root", password="" )
mycursor = mysqldb.cursor(dictionary=True) mycursor.execute("SELECT * from categories;") myresult = mycursor.fetchall()
print(myresult)
When the script completes without any errors, you should see an output like:
[
{
"id":4,
"name":"Medicine",
"description":"<p>Medicine<br></p>",
"created_at":"",
"updated_at":""
},
{
"id":6,
"name":"Food",
"description":"<p>Food</p>",
"created_at":"",
"updated_at":""
},
{
"id":8,
"name":"Groceries",
"description":"<p>Groceries<br></p>",
"created_at":"",
"updated_at":""
},
{
"id":9,
"name":"Cakes & Bakes",
"description":"<p>Cakes & Bakes<br></p>",
"created_at":d"",
"updated_at":""
}
]
Note that the output is a JSON array, since we passed in the dictionary=True
argument to the cursor. Otherwise, the results would be in a list format. We now have the source data in JSON format, ready to be migrated to a MongoDB collection.
Step 3: Write to MongoDB Collections
Once you have the source data in JSON format, the next step is to insert the data into a MongoDB collection. A collection is a set of documents and is the NoSQL equivalent of a table (or relation) in an RDBMS. We do that by calling the insert_many()
method of the collection class, which returns the list of object ids of the inserted documents. Note that this method will throw an exception when an empty list is passed on as the argument, and hence the length check before the method call.
import pymongo
mongodb_host = "mongodb://localhost:27017/"
mongodb_dbname = "mymongodb"
myclient = pymongo.MongoClient(mongodb_host)
mydb = myclient[mongodb_dbname]
mycol = mydb["categories"]
if len(myresult) > 0:
x = mycol.insert_many(myresult) #myresult comes from mysql cursor
print(len(x.inserted_ids))
After this step, you can check your MongoDB instance to verify that the database and collection have been created and the documents inserted. Note that MongoDB is schema-less, which means that you don't have to define the schema to insert documents, the schema is inferred on the fly and created automatically. MongoDB also creates the database and collection referenced in the code, if they do not already exist.
Step 4: Putting Things Together
Here is the complete script to read a table from MySQL and insert it into a collection in MongoDB.
import mysql.connector
import pymongo
delete_existing_documents = True
mysql_host="localhost"
mysql_database="mydatabase"
mysql_schema = "myschema"
mysql_user="myuser"
mysql_password="********"
mongodb_host = "mongodb://localhost:27017/"
mongodb_dbname = "mymongodb"
mysqldb = mysql.connector.connect(
host=mysql_host,
database=mysql_database,
user=mysql_user,
password=mysql_password
)
mycursor = mysqldb.cursor(dictionary=True)
mycursor.execute("SELECT * from categories;")
myresult = mycursor.fetchall()
myclient = pymongo.MongoClient(mongodb_host)
mydb = myclient[mongodb_dbname]
mycol = mydb["categories"]
if len(myresult) > 0:
x = mycol.insert_many(myresult) #myresult comes from mysql cursor
print(len(x.inserted_ids))
Step 5: Enhance the Script to Load All Tables in a MySQL Schema
The script reads a table from MySQL and load the results in a MongoDB collection. Now, the next step is to iterate through the list of all tables in the source database and load the results in a new MySQL collection. We can do this by querying the information_schema.tables
metadata table which gives us the list of tables in a given schema. We can then iterate through the result and call the above script to migrate the data of each table.
#Iterate through the list of tables in the schema
table_list_cursor = mysqldb.cursor()
table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name;", (mysql_schema,))
tables = table_list_cursor.fetchall()
for table in tables:
#Execute the migration script for 'table'
You can do this by abstracting the migration logic into a function.
#Function migrate_table
def migrate_table(db, col_name):
mycursor = db.cursor(dictionary=True)
mycursor.execute("SELECT * FROM " + col_name + ";")
myresult = mycursor.fetchall()
mycol = mydb[col_name]
if delete_existing_documents:
#delete all documents in the collection
mycol.delete_many({})
#insert the documents
if len(myresult) > 0:
x = mycol.insert_many(myresult)
return len(x.inserted_ids)
else:
return 0
Step 6: Output Script Progress and Make it Readable
The progress of the script is communicated through the use of print
statements. Use color coding to make the output easily readable. For example, you can print success statements in green and failed statements in red.
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
print(f"{bcolors.HEADER}This is a header{bcolors.ENDC}")
print(f"{bcolors.OKBLUE}This prints in blue{bcolors.ENDC}")
print(f"{bcolors.OKGREEN}This message is green{bcolors.ENDC}")
The Final Outcome
The Source MySQL Database
The Target MongoDB Database After the Migration
The Python Script and Output in VSCode
The Complete Script
import mysql.connector
import pymongo
import datetime
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
begin_time = datetime.datetime.now()
print(f"{bcolors.HEADER}Script started at: {begin_time} {bcolors.ENDC}")
delete_existing_documents = True;
mysql_host="localhost"
mysql_database="mydatabase"
mysql_schema = "myschhema"
mysql_user="root"
mysql_password=""
mongodb_host = "mongodb://localhost:27017/"
mongodb_dbname = "mymongodb"
print(f"{bcolors.HEADER}Initializing database connections...{bcolors.ENDC}")
print("")
#MySQL connection
print(f"{bcolors.HEADER}Connecting to MySQL server...{bcolors.ENDC}")
mysqldb = mysql.connector.connect(
host=mysql_host,
database=mysql_database,
user=mysql_user,
password=mysql_password
)
print(f"{bcolors.HEADER}Connection to MySQL Server succeeded.{bcolors.ENDC}")
#MongoDB connection
print(f"{bcolors.HEADER}Connecting to MongoDB server...{bcolors.ENDC}")
myclient = pymongo.MongoClient(mongodb_host)
mydb = myclient[mongodb_dbname]
print(f"{bcolors.HEADER}Connection to MongoDB Server succeeded.{bcolors.ENDC}")
print(f"{bcolors.HEADER}Database connections initialized successfully.{bcolors.ENDC}")
#Start migration
print(f"{bcolors.HEADER}Migration started...{bcolors.ENDC}")
dblist = myclient.list_database_names()
if mongodb_dbname in dblist:
print(f"{bcolors.OKBLUE}The database exists.{bcolors.ENDC}")
else:
print(f"{bcolors.WARNING}The database does not exist, it is being created.{bcolors.ENDC}")
#Function migrate_table
def migrate_table(db, col_name):
mycursor = db.cursor(dictionary=True)
mycursor.execute("SELECT * FROM " + col_name + ";")
myresult = mycursor.fetchall()
mycol = mydb[col_name]
if delete_existing_documents:
#delete all documents in the collection
mycol.delete_many({})
#insert the documents
if len(myresult) > 0:
x = mycol.insert_many(myresult)
return len(x.inserted_ids)
else:
return 0
#Iterate through the list of tables in the schema
table_list_cursor = mysqldb.cursor()
table_list_cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name LIMIT 15;", (mysql_schema,))
tables = table_list_cursor.fetchall()
total_count = len(tables)
success_count = 0
fail_count = 0
for table in tables:
try:
print(f"{bcolors.OKCYAN}Processing table: {table[0]}...{bcolors.ENDC}")
inserted_count = migrate_table(mysqldb, table[0])
print(f"{bcolors.OKGREEN}Processing table: {table[0]} completed. {inserted_count} documents inserted.{bcolors.ENDC}")
success_count += 1
except Exception as e:
print(f"{bcolors.FAIL} {e} {bcolors.ENDC}")
fail_count += 1
print("")
print("Migration completed.")
print(f"{bcolors.OKGREEN}{success_count} of {total_count} tables migrated successfully.{bcolors.ENDC}")
if fail_count > 0:
print(f"{bcolors.FAIL}Migration of {fail_count} tables failed. See errors above.{bcolors.ENDC}")
end_time = datetime.datetime.now()
print(f"{bcolors.HEADER}Script completed at: {end_time} {bcolors.ENDC}")
print(f"{bcolors.HEADER}Total execution time: {end_time-begin_time} {bcolors.ENDC}")
A Word of Caveat
This script serves well for small to medium size MySQL databases with a few hundred tables, each having a few thousand rows. Performance may suffer for large databases with millions of rows. Please try out with limited rows using the LIMIT keyword on the table list query and on the actual table select query before starting the actual migration.
Download
Download the entire script from GitHub here:
Published at DZone with permission of Shameel Ahmed. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments