Streamlining Data Integration
This article demonstrates integrating Salesforce and Oracle into Amazon Redshift using Python and stored procedures, leveraging the ELT approach.
Join the DZone community and get the full member experience.
Join For FreeIntegrating data from multiple sources like Salesforce and Oracle into Amazon Redshift is crucial for organizations looking to centralize their analytics. This article demonstrates how to connect to Salesforce and Oracle, extract data using SOQL and SQL queries, load it into Redshift staging tables, and perform transformations using Redshift stored procedures, all orchestrated through Python scripts.
Prerequisites
- Salesforce: Access to Salesforce with the necessary API permissions.
- Oracle: Access to an Oracle database with the necessary query permissions.
- Amazon Redshift: An existing Redshift cluster.
- Python: Installed with the necessary libraries (simple_salesforce, cx_Oracle, boto3, psycopg2).
Connecting to Salesforce and Extracting Data
First, let's connect to Salesforce and extract data using SOQL.
from simple_salesforce import Salesforce
import pandas as pd
# Salesforce credentials
sf = Salesforce(username='your_username',
password='your_password',
security_token='your_security_token')
# SOQL query to fetch data from Salesforce
query = "SELECT Id, Name, AccountNumber FROM Account"
response = sf.query_all(query)
# Convert response to a DataFrame
data_sf = pd.DataFrame(response['records']).drop(columns='attributes')
print(data_sf.head())
- Library Import: We import
Salesforce
fromsimple_salesforce
for easy Salesforce API interaction andpandas
for data manipulation. - Salesforce Connection: We establish a connection to Salesforce using the provided credentials. The best way to implement is to pass username and password as parameters from config file or using environmental variables, don't hardcode passwords.
- SOQL Query: We execute a SOQL (Salesforce Object Query Language) query to retrieve specific fields (
Id
,Name
,AccountNumber
) from theAccount
object. - Data Conversion: The response is converted into a pandas DataFrame for easier manipulation and analysis.
Connecting to Oracle and Extracting Data
Next, let's connect to Oracle and extract data using SQL.
import cx_Oracle
# Oracle credentials and connection details
oracle_dsn = cx_Oracle.makedsn("your_oracle_host", "your_oracle_port", service_name="your_service_name")
conn_oracle = cx_Oracle.connect(user="your_username", password="your_password", dsn=oracle_dsn)
# SQL query to fetch data from Oracle
sql_query = "SELECT ID, NAME, ACCOUNT_NUMBER FROM ACCOUNTS"
data_oracle = pd.read_sql(sql_query, con=conn_oracle)
print(data_oracle.head())
# Close Oracle connection
conn_oracle.close()
- Library Import: We import
cx_Oracle
for Oracle database connections. - Oracle Connection: We establish a connection to Oracle using the provided credentials.
- SQL Query: We execute a SQL query to retrieve specific fields (
ID
,NAME
,ACCOUNT_NUMBER
) from theACCOUNTS
table. - Data Conversion: The result is converted into a pandas DataFrame for easier manipulation and analysis.
Loading Data Into Redshift Staging Tables
Now, we load the extracted data from Salesforce and Oracle into Redshift staging tables.
import boto3
import psycopg2
from io import StringIO
# Redshift credentials and connection details
redshift_host = 'your_redshift_host'
redshift_db = 'your_database'
redshift_user = 'your_user'
redshift_password = 'your_password'
redshift_port = 5439
# Connect to Redshift
conn_redshift = psycopg2.connect(
host=redshift_host,
dbname=redshift_db,
user=redshift_user,
password=redshift_password,
port=redshift_port
)
cur_redshift = conn_redshift.cursor()
# Create staging tables (if they don't exist)
create_sf_table_query = """
CREATE TABLE IF NOT EXISTS staging_account_sf (
Id VARCHAR(18),
Name VARCHAR(255),
AccountNumber VARCHAR(40)
);
"""
create_oracle_table_query = """
CREATE TABLE IF NOT EXISTS staging_account_oracle (
ID VARCHAR(18),
NAME VARCHAR(255),
ACCOUNT_NUMBER VARCHAR(40)
);
"""
cur_redshift.execute(create_sf_table_query)
cur_redshift.execute(create_oracle_table_query)
conn_redshift.commit()
# Load Salesforce data into staging table
csv_buffer_sf = StringIO()
data_sf.to_csv(csv_buffer_sf, index=False, header=False)
csv_buffer_sf.seek(0)
cur_redshift.copy_from(csv_buffer_sf, 'staging_account_sf', sep=',')
conn_redshift.commit()
# Load Oracle data into staging table
csv_buffer_oracle = StringIO()
data_oracle.to_csv(csv_buffer_oracle, index=False, header=False)
csv_buffer_oracle.seek(0)
cur_redshift.copy_from(csv_buffer_oracle, 'staging_account_oracle', sep=',')
conn_redshift.commit()
- Library Import: We import
boto3
for AWS interactions,psycopg2
for PostgreSQL/Redshift connections, andStringIO
for in-memory file operations. - Redshift Connection: We establish a connection to Redshift using the provided credentials.
- Create Staging Tables: We create staging tables (
staging_account_sf
for Salesforce data andstaging_account_oracle
for Oracle data) if they don't already exist. - Data Loading: The Data Frames are converted to CSV format and loaded into the respective staging tables using
copy_from
, which efficiently loads data into Redshift.
Executing Stored Procedures for ELT
Once the data is in the staging tables, we can call stored procedures in Redshift to transform the data and load it into the final tables.
# Call stored procedure for transformation
stored_procedure_query = "CALL transform_data_procedure();"
cur_redshift.execute(stored_procedure_query)
conn_redshift.commit()
# Verify data in the final table
verify_query = "SELECT * FROM final_account_table LIMIT 10;"
cur_redshift.execute(verify_query)
for row in cur_redshift.fetchall():
print(row)
# Close the connection
cur_redshift.close()
conn_redshift.close()
- Stored Procedure Call: We call a stored procedure (
transform_data_procedure
) in Redshift that performs the necessary transformations and loads the data into the final tables. This encapsulates the ELT (Extract, Load, Transform) logic within the database, leveraging Redshift's processing power. - Data Verification: We run a query to verify that the data has been correctly transformed and loaded into the final table (
final_account_table
). - Close Connection: Finally, we close the cursor and the database connection to clean up resources.
Conclusion
This script demonstrates a complete workflow for extracting data from Salesforce and Oracle, loading it into Amazon Redshift, and performing ELT operations using stored procedures. This approach leverages the strengths of each component: Salesforce and Oracle for CRM and relational data, Python for orchestration, and Redshift for scalable data transformations.
By centralizing data in Redshift, organizations can perform more comprehensive analyses and derive valuable insights from their Salesforce and Oracle data, enabling better decision-making and operational efficiency.
Opinions expressed by DZone contributors are their own.
Comments