Aller au contenu

Exemples d'implémentation

Cette section présente deux projets complets et fonctionnels : une API REST CRUD avec FastAPI et SQLAlchemy, puis un pipeline de traitement de données avec polars expose via FastAPI. Les deux exemples utilisent des pratiques de production récentes (Python 3.12+, Pydantic v2, SQLAlchemy 2.x).


Projet 1 — API REST CRUD avec FastAPI

Structure du projet

api/
├── main.py          # Point d'entree FastAPI
├── database.py      # Configuration SQLAlchemy
├── models.py        # Modeles ORM
├── schemas.py       # Schemas Pydantic
└── requirements.txt

database.py — Configuration SQLAlchemy async

# pip install fastapi uvicorn[standard] sqlalchemy aiosqlite pydantic

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "sqlite+aiosqlite:///./items.db"

engine = create_async_engine(DATABASE_URL, echo=False)

SessionLocal = async_sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


class Base(DeclarativeBase):
    pass


async def get_db():
    """Dependency FastAPI — fournit une session de base de donnees."""
    async with SessionLocal() as session:
        yield session

models.py — Modèle ORM

from sqlalchemy import String, Float, Boolean
from sqlalchemy.orm import Mapped, mapped_column
from database import Base


class Item(Base):
    __tablename__ = "items"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    nom: Mapped[str] = mapped_column(String(200), nullable=False)
    description: Mapped[str | None] = mapped_column(String(500), nullable=True)
    prix: Mapped[float] = mapped_column(Float, nullable=False)
    disponible: Mapped[bool] = mapped_column(Boolean, default=True)

schémas.py — Validation Pydantic v2

from pydantic import BaseModel, Field, ConfigDict


class ItemBase(BaseModel):
    nom: str = Field(..., min_length=1, max_length=200)
    description: str | None = Field(None, max_length=500)
    prix: float = Field(..., gt=0, description="Prix en euros, strictement positif")
    disponible: bool = True


class ItemCreate(ItemBase):
    """Schema pour la creation d'un item (sans id)."""
    pass


class ItemUpdate(BaseModel):
    """Schema pour la mise a jour partielle (tous les champs optionnels)."""
    nom: str | None = Field(None, min_length=1, max_length=200)
    description: str | None = None
    prix: float | None = Field(None, gt=0)
    disponible: bool | None = None


class ItemResponse(ItemBase):
    """Schema de reponse incluant l'identifiant."""
    id: int

    model_config = ConfigDict(from_attributes=True)

main.py — Endpoints CRUD

from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from database import engine, Base, get_db
from models import Item
from schemas import ItemCreate, ItemUpdate, ItemResponse


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Cree les tables au demarrage de l'application."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield


app = FastAPI(
    title="Items API",
    version="1.0.0",
    lifespan=lifespan,
)


@app.get("/items", response_model=list[ItemResponse])
async def lister_items(
    disponible: bool | None = None,
    db: AsyncSession = Depends(get_db),
) -> list[Item]:
    """Liste tous les items, avec filtre optionnel sur la disponibilite."""
    query = select(Item)
    if disponible is not None:
        query = query.where(Item.disponible == disponible)
    result = await db.execute(query)
    return list(result.scalars().all())


@app.post("/items", response_model=ItemResponse, status_code=201)
async def creer_item(
    data: ItemCreate,
    db: AsyncSession = Depends(get_db),
) -> Item:
    """Cree un nouvel item en base de donnees."""
    item = Item(**data.model_dump())
    db.add(item)
    await db.commit()
    await db.refresh(item)
    return item


@app.get("/items/{item_id}", response_model=ItemResponse)
async def lire_item(item_id: int, db: AsyncSession = Depends(get_db)) -> Item:
    """Retourne un item par son identifiant."""
    item = await db.get(Item, item_id)
    if item is None:
        raise HTTPException(status_code=404, detail=f"Item {item_id} non trouve")
    return item


@app.put("/items/{item_id}", response_model=ItemResponse)
async def mettre_a_jour_item(
    item_id: int,
    data: ItemUpdate,
    db: AsyncSession = Depends(get_db),
) -> Item:
    """Mise a jour partielle d'un item (PATCH semantics via PUT)."""
    item = await db.get(Item, item_id)
    if item is None:
        raise HTTPException(status_code=404, detail=f"Item {item_id} non trouve")

    # Applique uniquement les champs fournis
    for champ, valeur in data.model_dump(exclude_none=True).items():
        setattr(item, champ, valeur)

    await db.commit()
    await db.refresh(item)
    return item


@app.delete("/items/{item_id}", status_code=204)
async def supprimer_item(item_id: int, db: AsyncSession = Depends(get_db)) -> None:
    """Supprime un item de la base de donnees."""
    item = await db.get(Item, item_id)
    if item is None:
        raise HTTPException(status_code=404, detail=f"Item {item_id} non trouve")
    await db.delete(item)
    await db.commit()
# Lancement
uvicorn main:app --reload
# Documentation interactive : http://localhost:8000/docs

Projet 2 — Pipeline de données avec polars

Ce second projet illustre un pipeline de traitement de données CSV avec polars, expose via un endpoint FastAPI.

Structure

pipeline/
├── pipeline.py      # Logique de traitement polars
├── main.py          # Exposition via FastAPI
└── ventes.csv       # Donnees d'exemple

ventes.csv (exemple de données)

date,region,produit,quantite,prix_unitaire
2024-01-15,Nord,Widget A,10,25.50
2024-01-15,Sud,Widget B,5,42.00
2024-01-16,Nord,Widget A,8,25.50
2024-01-16,Est,Widget C,12,18.75
2024-01-17,Sud,Widget A,15,25.50
2024-01-17,Nord,Widget C,3,18.75

pipeline.py — Traitement avec polars

# pip install polars fastapi uvicorn

import polars as pl
from pathlib import Path
from typing import Any


def charger_ventes(chemin: str | Path) -> pl.DataFrame:
    """Charge le CSV de ventes avec inference de types."""
    return pl.read_csv(
        chemin,
        try_parse_dates=True,  # Convertit automatiquement les colonnes date
    )


def enrichir_ventes(df: pl.DataFrame) -> pl.DataFrame:
    """Ajoute des colonnes calculees au dataframe."""
    return df.with_columns(
        # Calcul du chiffre d'affaires par ligne
        (pl.col("quantite") * pl.col("prix_unitaire")).alias("ca"),
        # Extraction du mois pour agregation mensuelle
        pl.col("date").dt.month().alias("mois"),
    )


def agreger_par_region(df: pl.DataFrame) -> pl.DataFrame:
    """Retourne le CA total et la quantite vendue par region."""
    return (
        df.group_by("region")
        .agg(
            pl.col("ca").sum().alias("ca_total"),
            pl.col("quantite").sum().alias("quantite_totale"),
            pl.col("produit").n_unique().alias("nb_produits_distincts"),
        )
        .sort("ca_total", descending=True)
    )


def agreger_par_produit(df: pl.DataFrame) -> pl.DataFrame:
    """Retourne les statistiques de ventes par produit."""
    return (
        df.group_by("produit")
        .agg(
            pl.col("ca").sum().alias("ca_total"),
            pl.col("quantite").sum().alias("quantite_totale"),
            pl.col("prix_unitaire").mean().alias("prix_moyen"),
        )
        .sort("ca_total", descending=True)
    )


def executer_pipeline(chemin_csv: str | Path) -> dict[str, Any]:
    """
    Execute le pipeline complet de traitement des ventes.

    Retourne un dictionnaire avec les agregations par region et par produit.
    """
    df_brut = charger_ventes(chemin_csv)
    df_enrichi = enrichir_ventes(df_brut)

    return {
        "nb_lignes": len(df_enrichi),
        "ca_global": float(df_enrichi["ca"].sum()),
        "par_region": agreger_par_region(df_enrichi).to_dicts(),
        "par_produit": agreger_par_produit(df_enrichi).to_dicts(),
    }

main.py — Exposition via FastAPI

from fastapi import FastAPI, HTTPException, UploadFile, File
from pathlib import Path
import tempfile
import os

from pipeline import executer_pipeline

app = FastAPI(title="Pipeline Ventes API", version="1.0.0")

# Chemin vers le CSV par defaut
CSV_PAR_DEFAUT = Path(__file__).parent / "ventes.csv"


@app.get("/analyse/ventes")
async def analyser_ventes_par_defaut() -> dict:
    """
    Execute le pipeline sur le fichier CSV par defaut.
    Retourne les agregations par region et par produit.
    """
    if not CSV_PAR_DEFAUT.exists():
        raise HTTPException(
            status_code=404,
            detail="Fichier de donnees par defaut introuvable",
        )
    return executer_pipeline(CSV_PAR_DEFAUT)


@app.post("/analyse/upload")
async def analyser_csv_uploade(fichier: UploadFile = File(...)) -> dict:
    """
    Accepte un CSV uploade et execute le pipeline dessus.
    Le fichier est supprime apres traitement.
    """
    if not fichier.filename or not fichier.filename.endswith(".csv"):
        raise HTTPException(status_code=400, detail="Le fichier doit etre un CSV")

    # Ecriture dans un fichier temporaire
    with tempfile.NamedTemporaryFile(suffix=".csv", delete=False) as tmp:
        contenu = await fichier.read()
        tmp.write(contenu)
        chemin_tmp = tmp.name

    try:
        return executer_pipeline(chemin_tmp)
    except Exception as exc:
        raise HTTPException(
            status_code=422,
            detail=f"Erreur de traitement du CSV : {exc}",
        ) from exc
    finally:
        os.unlink(chemin_tmp)

Polars vs pandas

Polars est significativement plus rapide que pandas pour les gros volumes de données (>100k lignes) grâce à son moteur Rust et son exécution parallelisee. L'API est similaire mais les expressions Polars sont lazily evaluated — preferez pl.LazyFrame avec .collect() pour les gros datasets.

# Test du pipeline
uvicorn main:app --reload
curl http://localhost:8000/analyse/ventes