ETL Pipeline Practice - SQL to MongoDB This repository contains a practice project for building an ETL (Extract, Transform, Load) pipeline to extract data from SQL databases, transform it into a readable and loadable format, and load it into MongoDB using Python. Step 1: collect data from MySQL Step 2: transform the data Step 3: Ingest the data into the new model with MongoDB Database. For this project we are going to use a sample of mysqlututorial.org named as classicmodels. You can download this sample from here or from this repository. With this samples in hands, you will restore the database using the follow command: mysql -u -p -t < mysqlsampledatabase.sql Therefore, you will be able to query the data using SQL. OK, kepping going ... In the following steps we are going to understand the purpose of this project. Some of the prerequisites are:
- MySQL local installed or an instance on cloud
- MongoDB local installed or an instance on cloud
- Python 3.10 installed Project scenario: For this project I have a MySQL sever 8.0 installed on my ubuntu machine. However, for the NoSQL step I used the MongoDB Database on MongoDB Atlas. Therefore, the code will reflect the configurations accordingly to the previus definition. An IDE can be of your choose, I particulary enjoy the Pycharm IDE.
ETL Processing - Extract Stage Step 1: Set up the Environment Ensure you have the required libraries installed. You can install them using pip: pip install SQLAlchemy pymongo pandas (or polars if you choose to use it instead of pandas) If you don't have the connector run the follow command:
pip install pymysql In this particular project we gonna use the PyMySQL driver.However there are others that you can use. Feel free to modify for a driver of your own choose. For the last stage, and to be able to connect to the MongoDB Database on Atlas you need to install another package: "pymongo[srv]" python3.10 -m pip install "pymongo[srv]" Step 2: Connect to MySQL Database Use SQLAlchemy to connect to your MySQL database and fetch the data you want to transform. Replace the placeholders in the code below with your actual database connection details:
Bellow you are going to find the connection method that is related to the MySQLConnection class in the code. You will find this piece of code into the mysql_connection.py. from sqlalchemy import create_engine def set_mysql_engine(self):
connection_string = ''.join(['mysql+pymysql://', self.user, ':', self.passwd, '@', self.host, ':', str(self.port), '/', self.database])
self.engine = create_engine(connection_string) try: self.engine.connect() except ConnectionError(): raise 'Error during the connection'
ETL Processing - Transformation Stage Step 3: Data Transformation and Modeling Perform any necessary data transformation using pandas or polars (depending on your choice). This might include cleaning, filtering, aggregating, or any other manipulation required to prepare the data for MongoDB insertion. def transforming_data(data):
""" Transformation of the data from tabular to document format :param data: dict with the tabular data :return: dict based in json document format
1° step: receive the data and convert into a dataframe 2° step: retrive the dataframe subset based on the context data 3° step: build the new model - document oriented 4° step: return the document
""" Programming code: data_transformation.py ETL Processing - Load Stage Step 4: Connect to MongoDB Use PyMongo to establish a connection to your MongoDB server. Replace the placeholders in the code below with your MongoDB connection details: from pymongo import MongoClient """ General template """
client = MongoClient('mongodb://user:password@host:port/') db = client['your_database_name'] # Replace 'your_database_name' with your desired database name collection = db['your_collection_name'] # Replace 'your_collection_name' with your desired collection name In this project the connection is storage into a class method, like the code bellow:
def connecting(self): # mongodb+srv://pymongo:@cluster0.2nj1fc2.mongodb.net/?retryWrites=true&w=majority
connection_string = ''.join(['mongodb+srv://',self.user,':',self.passwd,'@',self.domain])
return MongoClient(connection_string)
To be able to connect to the MongoDB Database on Atlas you need to install another package: "pymongo[srv]"
python3.10 -m pip install "pymongo[srv]" Step 5: Data Ingestion into MongoDB Iterate over the transformed data and insert it into MongoDB. First create the dataase and the collection that will store the data documents. client = instance_mongodb.connecting() db = client.get_database('dio_analytics') print('Coleções:\n',db.list_collection_names())
collection = db.get_collection('orders') for doc in posts: result = collection.insert_one(doc) print(result.inserted_id) Step 6: Done! Put everything together into a Python script, and you have your data engineering project ready to go. You can run the script whenever you need to transfer data from MySQL to MongoDB. Now, you can access the MongoDB Atlas and visualize the data the you just inserted into the NoSQL Database Remember to handle any potential errors, add logging, and optimize the code based on the scale of your data. Please note that the provided steps are just a basic outline, and you can expand the project according to your specific requirements and the complexity of your data transformation needs.