Exemples d'implémentation¶
Cette section présente deux projets complets et fonctionnels : une API REST CRUD avec FastAPI et SQLAlchemy, puis un pipeline de traitement de données avec polars expose via FastAPI. Les deux exemples utilisent des pratiques de production récentes (Python 3.12+, Pydantic v2, SQLAlchemy 2.x).
Projet 1 — API REST CRUD avec FastAPI¶
Structure du projet¶
api/
├── main.py # Point d'entree FastAPI
├── database.py # Configuration SQLAlchemy
├── models.py # Modeles ORM
├── schemas.py # Schemas Pydantic
└── requirements.txt
database.py — Configuration SQLAlchemy async¶
# pip install fastapi uvicorn[standard] sqlalchemy aiosqlite pydantic
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "sqlite+aiosqlite:///./items.db"
engine = create_async_engine(DATABASE_URL, echo=False)
SessionLocal = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
pass
async def get_db():
"""Dependency FastAPI — fournit une session de base de donnees."""
async with SessionLocal() as session:
yield session
models.py — Modèle ORM¶
from sqlalchemy import String, Float, Boolean
from sqlalchemy.orm import Mapped, mapped_column
from database import Base
class Item(Base):
__tablename__ = "items"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
nom: Mapped[str] = mapped_column(String(200), nullable=False)
description: Mapped[str | None] = mapped_column(String(500), nullable=True)
prix: Mapped[float] = mapped_column(Float, nullable=False)
disponible: Mapped[bool] = mapped_column(Boolean, default=True)
schémas.py — Validation Pydantic v2¶
from pydantic import BaseModel, Field, ConfigDict
class ItemBase(BaseModel):
nom: str = Field(..., min_length=1, max_length=200)
description: str | None = Field(None, max_length=500)
prix: float = Field(..., gt=0, description="Prix en euros, strictement positif")
disponible: bool = True
class ItemCreate(ItemBase):
"""Schema pour la creation d'un item (sans id)."""
pass
class ItemUpdate(BaseModel):
"""Schema pour la mise a jour partielle (tous les champs optionnels)."""
nom: str | None = Field(None, min_length=1, max_length=200)
description: str | None = None
prix: float | None = Field(None, gt=0)
disponible: bool | None = None
class ItemResponse(ItemBase):
"""Schema de reponse incluant l'identifiant."""
id: int
model_config = ConfigDict(from_attributes=True)
main.py — Endpoints CRUD¶
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import engine, Base, get_db
from models import Item
from schemas import ItemCreate, ItemUpdate, ItemResponse
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Cree les tables au demarrage de l'application."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
app = FastAPI(
title="Items API",
version="1.0.0",
lifespan=lifespan,
)
@app.get("/items", response_model=list[ItemResponse])
async def lister_items(
disponible: bool | None = None,
db: AsyncSession = Depends(get_db),
) -> list[Item]:
"""Liste tous les items, avec filtre optionnel sur la disponibilite."""
query = select(Item)
if disponible is not None:
query = query.where(Item.disponible == disponible)
result = await db.execute(query)
return list(result.scalars().all())
@app.post("/items", response_model=ItemResponse, status_code=201)
async def creer_item(
data: ItemCreate,
db: AsyncSession = Depends(get_db),
) -> Item:
"""Cree un nouvel item en base de donnees."""
item = Item(**data.model_dump())
db.add(item)
await db.commit()
await db.refresh(item)
return item
@app.get("/items/{item_id}", response_model=ItemResponse)
async def lire_item(item_id: int, db: AsyncSession = Depends(get_db)) -> Item:
"""Retourne un item par son identifiant."""
item = await db.get(Item, item_id)
if item is None:
raise HTTPException(status_code=404, detail=f"Item {item_id} non trouve")
return item
@app.put("/items/{item_id}", response_model=ItemResponse)
async def mettre_a_jour_item(
item_id: int,
data: ItemUpdate,
db: AsyncSession = Depends(get_db),
) -> Item:
"""Mise a jour partielle d'un item (PATCH semantics via PUT)."""
item = await db.get(Item, item_id)
if item is None:
raise HTTPException(status_code=404, detail=f"Item {item_id} non trouve")
# Applique uniquement les champs fournis
for champ, valeur in data.model_dump(exclude_none=True).items():
setattr(item, champ, valeur)
await db.commit()
await db.refresh(item)
return item
@app.delete("/items/{item_id}", status_code=204)
async def supprimer_item(item_id: int, db: AsyncSession = Depends(get_db)) -> None:
"""Supprime un item de la base de donnees."""
item = await db.get(Item, item_id)
if item is None:
raise HTTPException(status_code=404, detail=f"Item {item_id} non trouve")
await db.delete(item)
await db.commit()
Projet 2 — Pipeline de données avec polars¶
Ce second projet illustre un pipeline de traitement de données CSV avec polars, expose via un endpoint FastAPI.
Structure¶
pipeline/
├── pipeline.py # Logique de traitement polars
├── main.py # Exposition via FastAPI
└── ventes.csv # Donnees d'exemple
ventes.csv (exemple de données)¶
date,region,produit,quantite,prix_unitaire
2024-01-15,Nord,Widget A,10,25.50
2024-01-15,Sud,Widget B,5,42.00
2024-01-16,Nord,Widget A,8,25.50
2024-01-16,Est,Widget C,12,18.75
2024-01-17,Sud,Widget A,15,25.50
2024-01-17,Nord,Widget C,3,18.75
pipeline.py — Traitement avec polars¶
# pip install polars fastapi uvicorn
import polars as pl
from pathlib import Path
from typing import Any
def charger_ventes(chemin: str | Path) -> pl.DataFrame:
"""Charge le CSV de ventes avec inference de types."""
return pl.read_csv(
chemin,
try_parse_dates=True, # Convertit automatiquement les colonnes date
)
def enrichir_ventes(df: pl.DataFrame) -> pl.DataFrame:
"""Ajoute des colonnes calculees au dataframe."""
return df.with_columns(
# Calcul du chiffre d'affaires par ligne
(pl.col("quantite") * pl.col("prix_unitaire")).alias("ca"),
# Extraction du mois pour agregation mensuelle
pl.col("date").dt.month().alias("mois"),
)
def agreger_par_region(df: pl.DataFrame) -> pl.DataFrame:
"""Retourne le CA total et la quantite vendue par region."""
return (
df.group_by("region")
.agg(
pl.col("ca").sum().alias("ca_total"),
pl.col("quantite").sum().alias("quantite_totale"),
pl.col("produit").n_unique().alias("nb_produits_distincts"),
)
.sort("ca_total", descending=True)
)
def agreger_par_produit(df: pl.DataFrame) -> pl.DataFrame:
"""Retourne les statistiques de ventes par produit."""
return (
df.group_by("produit")
.agg(
pl.col("ca").sum().alias("ca_total"),
pl.col("quantite").sum().alias("quantite_totale"),
pl.col("prix_unitaire").mean().alias("prix_moyen"),
)
.sort("ca_total", descending=True)
)
def executer_pipeline(chemin_csv: str | Path) -> dict[str, Any]:
"""
Execute le pipeline complet de traitement des ventes.
Retourne un dictionnaire avec les agregations par region et par produit.
"""
df_brut = charger_ventes(chemin_csv)
df_enrichi = enrichir_ventes(df_brut)
return {
"nb_lignes": len(df_enrichi),
"ca_global": float(df_enrichi["ca"].sum()),
"par_region": agreger_par_region(df_enrichi).to_dicts(),
"par_produit": agreger_par_produit(df_enrichi).to_dicts(),
}
main.py — Exposition via FastAPI¶
from fastapi import FastAPI, HTTPException, UploadFile, File
from pathlib import Path
import tempfile
import os
from pipeline import executer_pipeline
app = FastAPI(title="Pipeline Ventes API", version="1.0.0")
# Chemin vers le CSV par defaut
CSV_PAR_DEFAUT = Path(__file__).parent / "ventes.csv"
@app.get("/analyse/ventes")
async def analyser_ventes_par_defaut() -> dict:
"""
Execute le pipeline sur le fichier CSV par defaut.
Retourne les agregations par region et par produit.
"""
if not CSV_PAR_DEFAUT.exists():
raise HTTPException(
status_code=404,
detail="Fichier de donnees par defaut introuvable",
)
return executer_pipeline(CSV_PAR_DEFAUT)
@app.post("/analyse/upload")
async def analyser_csv_uploade(fichier: UploadFile = File(...)) -> dict:
"""
Accepte un CSV uploade et execute le pipeline dessus.
Le fichier est supprime apres traitement.
"""
if not fichier.filename or not fichier.filename.endswith(".csv"):
raise HTTPException(status_code=400, detail="Le fichier doit etre un CSV")
# Ecriture dans un fichier temporaire
with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
contenu = await fichier.read()
tmp.write(contenu)
chemin_tmp = tmp.name
try:
return executer_pipeline(chemin_tmp)
except Exception as exc:
raise HTTPException(
status_code=422,
detail=f"Erreur de traitement du CSV : {exc}",
) from exc
finally:
os.unlink(chemin_tmp)
Polars vs pandas
Polars est significativement plus rapide que pandas pour les gros volumes de données (>100k lignes) grâce à son moteur Rust et son exécution parallelisee. L'API est similaire mais les expressions Polars sont lazily evaluated — preferez pl.LazyFrame avec .collect() pour les gros datasets.