Pandas + SQLAlchemy Integration
🤝 Pandas + SQLAlchemy: The ETL Powerhouse
In Data Engineering, SQLAlchemy handles the “Connection and Schema,” while Pandas handles the “Transformation.” Combining them allows you to build robust, type-safe ETL (Extract, Transform, Load) pipelines.
🟢 Phase 1: Foundations (Simple I/O)
1. Connecting with an Engine
Pandas requires a SQLAlchemy Engine to interact with a database.
import pandas as pd
from sqlalchemy import create_engine
# 1. Create the engine
engine = create_engine("postgresql://user:pass@localhost:5432/mydb")
# 2. Extract: Read from SQL to DataFrame
df = pd.read_sql("SELECT * FROM users WHERE active = true", engine)
# 3. Load: Write from DataFrame to SQL
df.to_sql("active_users", engine, if_exists="replace", index=False)🟡 Phase 2: Intermediate (Performance & Schema)
2. High-Performance Loading (to_sql)
Writing large DataFrames can be slow. Use chunksize and method to speed it up.
# method="multi" allows for bulk insertion (MUCH faster)
df.to_sql(
"large_table",
engine,
if_exists="append",
index=False,
chunksize=1000,
method="multi"
)3. Using SQLAlchemy Models for Schema
Instead of letting Pandas guess the schema (which often results in TEXT or BIGINT for everything), use SQLAlchemy to define the table first.
from sqlalchemy import Table, Column, Integer, String, MetaData
metadata = MetaData()
user_table = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("username", String(50), nullable=False),
)
# Pandas will use this table definition if it exists
df.to_sql("users", engine, if_exists="append", index=False)🟠 Phase 3: Expert (Dynamic Filtering & Types)
4. Parameterized Queries (Security)
Never use f-strings for SQL queries! Use SQLAlchemy’s parameter binding to prevent SQL injection.
from sqlalchemy import text
query = text("SELECT * FROM sales WHERE region = :region AND date > :date")
df = pd.read_sql(query, engine, params={"region": "North", "date": "2023-01-01"})5. Dtype Mapping (Python to DB)
You can specify the exact SQL types for each column during the to_sql call.
from sqlalchemy.types import Integer, DateTime, String
dtype_map = {
"user_id": Integer(),
"signup_date": DateTime(),
"bio": String(255)
}
df.to_sql("profiles", engine, dtype=dtype_map, if_exists="replace")🔴 Phase 4: Senior Architect (Upserts & Large Scale)
6. The “Upsert” Pattern (Insert or Update)
Pandas to_sql does not support “Upsert” natively. You must use a Staging Table pattern.
- Load data into a temporary
staging_table. - Run a raw SQL command (via SQLAlchemy) to
MERGEorINSERT ... ON CONFLICTinto the main table. - Drop the
staging_table.
# 1. Load to staging
df.to_sql("stg_users", engine, if_exists="replace")
# 2. Execute Upsert via SQLAlchemy Core
with engine.begin() as conn:
conn.execute(text("""
INSERT INTO users (id, name)
SELECT id, name FROM stg_users
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name
"""))
conn.execute(text("DROP TABLE stg_users"))7. Memory-Efficient Extraction (chunksize)
For massive tables, don’t load everything into RAM. Iterate through the SQL result in chunks.
# read_sql returns a generator when chunksize is provided
for chunk_df in pd.read_sql("SELECT * FROM huge_table", engine, chunksize=50000):
# Process each 50k row chunk in memory
process_data(chunk_df)
# Load to another destination
chunk_df.to_sql("destination", engine, if_exists="append")