Aller au contenu

Intégration SI

Les données IoT n'ont de valeur que si elles enrichissent les systèmes décisionnels de l'entreprise — connecter le pipeline terrain aux ERP, MES et datawarehouses est l'étape finale qui crée la valeur métier.


Cartographie des systèmes à intégrer

Un site industriel numérisé fait coexister plusieurs couches logicielles, chacune avec ses propres standards et protocoles :

Système Rôle Protocoles d'échange
SCADA / DCS Supervision temps réel des process OPC-UA, Modbus, propriétaire
MES (Manufacturing Execution System) Gestion de la production (ordres, qualité, traçabilité) REST, SOAP, OPC-UA
ERP (SAP, Oracle, Odoo) Gestion des ressources (stocks, achats, RH, finance) REST, SOAP, IDoc (SAP), EDI
CMMS (Gestion maintenance) Ordres de travail, historique interventions REST, SOAP
Datawarehouse / Data lake Analytique, reporting, ML SQL, JDBC, Delta Lake, Parquet
BI (Power BI, Tableau, Metabase) Rapports et analyses métier SQL, OData, REST

API REST et gRPC vers le SI

Exposition des données IoT en REST

La passerelle API expose les données du pipeline IoT (jumeaux, séries agrégées, événements) vers les systèmes tiers. Elle gère l'authentification, le rate limiting et la transformation de format.

flowchart LR
    subgraph "Pipeline IoT"
        DITTO[Eclipse Ditto\njumeaux numériques]
        TSDB[(TimescaleDB\nséries agrégées)]
        KAFKA[Kafka\névénements]
    end

    subgraph "API Gateway"
        GW[Kong / Traefik\nRate limiting\nJWT / OAuth2\nTransformation]
    end

    subgraph "Systèmes SI"
        MES[MES\nOrdres de fabrication]
        ERP[ERP SAP\nGestion stocks]
        CMMS[CMMS\nMaintenance]
        DWH[(Datawarehouse\nClickHouse)]
        BI[Power BI\nTableau]
    end

    DITTO -->|REST| GW
    TSDB -->|JDBC| GW
    KAFKA -->|Consumer| GW
    GW -->|REST / webhook| MES
    GW -->|REST / IDoc| ERP
    GW -->|REST| CMMS
    GW -->|JDBC / Bulk insert| DWH
    DWH -->|SQL| BI

Exemple d'API REST — endpoint état équipement

# FastAPI — exposition de l'état d'un équipement IoT vers le MES
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from datetime import datetime
import httpx

app = FastAPI(title="IoT Integration API", version="1.0")

class EquipmentStatus(BaseModel):
    device_id: str
    state: str
    bearing_temp: float | None
    flow_rate: float | None
    last_updated: datetime
    predicted_failure_date: datetime | None

@app.get("/api/v1/equipment/{device_id}/status",
         response_model=EquipmentStatus,
         summary="État courant d'un équipement")
async def get_equipment_status(
    device_id: str,
    ditto_client: httpx.AsyncClient = Depends(get_ditto_client)
):
    # Récupérer l'état depuis Eclipse Ditto
    resp = await ditto_client.get(
        f"/api/2/things/industrial:{device_id}"
    )
    if resp.status_code == 404:
        raise HTTPException(status_code=404, detail="Équipement inconnu")

    thing = resp.json()
    features = thing.get("features", {})

    return EquipmentStatus(
        device_id=device_id,
        state=features.get("operationalState", {})
                       .get("properties", {})
                       .get("state", "UNKNOWN"),
        bearing_temp=features.get("telemetry", {})
                              .get("properties", {})
                              .get("bearingTemperature"),
        flow_rate=features.get("telemetry", {})
                           .get("properties", {})
                           .get("flowRate"),
        last_updated=thing.get("_modified"),
        predicted_failure_date=features.get("predictive", {})
                                        .get("properties", {})
                                        .get("predictedFailureDate")
    )

gRPC pour les flux à haute fréquence

REST est adapté aux requêtes ponctuelles. Pour les flux continus (abonnement aux événements, streaming de séries), gRPC offre une latence et un débit bien supérieurs grâce au multiplexage HTTP/2.

// iot_integration.proto
syntax = "proto3";

service IotIntegration {
  // Requête unique
  rpc GetEquipmentStatus (EquipmentRequest) returns (EquipmentStatus);
  // Stream serveur → client
  rpc StreamTelemetry (TelemetryStreamRequest) returns (stream TelemetryPoint);
  // Bidirectionnel : commandes + confirmations
  rpc CommandStream (stream Command) returns (stream CommandResult);
}

message TelemetryStreamRequest {
  repeated string device_ids = 1;
  repeated string measures    = 2;
  int32 interval_ms           = 3;
}

message TelemetryPoint {
  string device_id  = 1;
  string measure    = 2;
  double value      = 3;
  int64 timestamp   = 4;  // Unix ms
}

Connecteurs ERP et MES

Intégration avec SAP via REST/OData

SAP S/4HANA expose une API OData (basée sur REST + format Atom/JSON) pour la plupart de ses modules.

# Créer automatiquement un ordre de maintenance SAP
# depuis une alerte IoT de défaillance imminente
import requests

SAP_BASE = "https://sap.example.com/sap/opu/odata/sap/PM_ORDER_SRV"

def create_maintenance_order(device_id: str, failure_desc: str, planned_date: str):
    payload = {
        "OrderType": "PM01",          # Ordre de maintenance préventif
        "FunctionalLocation": device_id,
        "ShortText": f"Maintenance prédictive — {device_id}",
        "LongText": failure_desc,
        "BasicStartDate": f"/Date({planned_date})/",
        "Priority": "2",              # Urgent
        "MainWorkCenter": "MAINT-ELEC"
    }
    resp = requests.post(
        f"{SAP_BASE}/MaintenanceOrders",
        json=payload,
        auth=("svc_iot", SAP_SERVICE_PASSWORD),
        headers={"Accept": "application/json"}
    )
    resp.raise_for_status()
    return resp.json()["d"]["OrderId"]

Intégration MES — remontée de KPIs de production

# Kafka Consumer → MES via webhook
from confluent_kafka import Consumer
import requests, json

consumer = Consumer({
    'bootstrap.servers': 'kafka1:9092,kafka2:9092',
    'group.id': 'mes-integration',
    'auto.offset.reset': 'latest'
})
consumer.subscribe(['iot.aggregated.production_kpis'])

MES_WEBHOOK = "https://mes.example.com/api/v2/production/realtime"

while True:
    msg = consumer.poll(1.0)
    if msg is None or msg.error():
        continue

    kpi = json.loads(msg.value())
    # Transformer en format MES
    mes_payload = {
        "line":         kpi["production_line"],
        "timestamp":    kpi["bucket"],
        "actual_rate":  kpi["actual_rate"],
        "good_parts":   kpi["good_parts"],
        "scrap_parts":  kpi["scrap_parts"],
        "downtime_min": kpi["downtime_seconds"] / 60
    }
    requests.post(MES_WEBHOOK, json=mes_payload,
                  headers={"X-API-Key": MES_API_KEY})

Données IoT vers le datawarehouse

Le datawarehouse reçoit les données IoT agrégées pour l'analytique long terme. Deux patterns coexistent :

Pattern ETL — Kafka Connect vers ClickHouse

{
  "name": "iot-clickhouse-sink",
  "config": {
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "tasks.max": "4",
    "topics": "iot.aggregated.1hour",
    "hostname": "clickhouse.example.com",
    "port": "8443",
    "ssl": "true",
    "database": "iot_warehouse",
    "table": "telemetry_hourly",
    "transforms": "flatten",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "insert.method": "insert",
    "batch.size": "10000"
  }
}

Pattern CDC — Change Data Capture depuis TimescaleDB

graph LR
    TS["TimescaleDB<br/>(source OLTP)"] -->|"Debezium<br/>WAL streaming"| KF["Kafka<br/>(CDC topic)"] -->|"Kafka Connect<br/>Sink connector"| CH["ClickHouse<br/>(DWH OLAP)"]

Le CDC (Change Data Capture) capture chaque INSERT/UPDATE/DELETE dans le WAL PostgreSQL via Debezium et les réplique en temps quasi réel vers le datawarehouse. Avantage : pas de requête polling sur la source, latence < 1 seconde.


Tableau interopérabilité des standards industriels

Standard Couche Usage principal Maturité
OPC-UA Terrain → cloud Interopérabilité automates, SCADA, MES Très élevée
MQTT + Sparkplug B Terrain → broker IoT SCADA-like structuré Élevée
ISA-95 (IEC 62264) MES / ERP Modèle de données production Référence mondiale
IFC / STEP Industrie construction Modèles 3D équipements Élevée
AAS (Asset Admin Shell) Industrie 4.0 Passeport numérique équipement Émergente
FIWARE NGSI-LD Smart cities / IoT Context broker sémantique Modérée
OpenTelemetry Observabilité Métriques, traces, logs uniformes Élevée

Ce qu'il faut retenir

  • L'intégration SI est la dernière étape du pipeline mais souvent la plus complexe : chaque système a ses standards, ses APIs et ses contraintes de sécurité.
  • REST couvre les requêtes ponctuelles, gRPC les flux continus, Kafka Connect les réplications volumineuses.
  • Le CDC via Debezium est la solution la plus robuste pour alimenter un datawarehouse sans impacter la source.

Pour aller plus loin : Sécurité IoT — avant d'exposer ce pipeline au réseau, sécuriser chaque couche de l'architecture est indispensable.