Intégration SI¶
Les données IoT n'ont de valeur que si elles enrichissent les systèmes décisionnels de l'entreprise — connecter le pipeline terrain aux ERP, MES et datawarehouses est l'étape finale qui crée la valeur métier.
Cartographie des systèmes à intégrer¶
Un site industriel numérisé fait coexister plusieurs couches logicielles, chacune avec ses propres standards et protocoles :
| Système | Rôle | Protocoles d'échange |
|---|---|---|
| SCADA / DCS | Supervision temps réel des process | OPC-UA, Modbus, propriétaire |
| MES (Manufacturing Execution System) | Gestion de la production (ordres, qualité, traçabilité) | REST, SOAP, OPC-UA |
| ERP (SAP, Oracle, Odoo) | Gestion des ressources (stocks, achats, RH, finance) | REST, SOAP, IDoc (SAP), EDI |
| CMMS (Gestion maintenance) | Ordres de travail, historique interventions | REST, SOAP |
| Datawarehouse / Data lake | Analytique, reporting, ML | SQL, JDBC, Delta Lake, Parquet |
| BI (Power BI, Tableau, Metabase) | Rapports et analyses métier | SQL, OData, REST |
API REST et gRPC vers le SI¶
Exposition des données IoT en REST¶
La passerelle API expose les données du pipeline IoT (jumeaux, séries agrégées, événements) vers les systèmes tiers. Elle gère l'authentification, le rate limiting et la transformation de format.
flowchart LR
subgraph "Pipeline IoT"
DITTO[Eclipse Ditto\njumeaux numériques]
TSDB[(TimescaleDB\nséries agrégées)]
KAFKA[Kafka\névénements]
end
subgraph "API Gateway"
GW[Kong / Traefik\nRate limiting\nJWT / OAuth2\nTransformation]
end
subgraph "Systèmes SI"
MES[MES\nOrdres de fabrication]
ERP[ERP SAP\nGestion stocks]
CMMS[CMMS\nMaintenance]
DWH[(Datawarehouse\nClickHouse)]
BI[Power BI\nTableau]
end
DITTO -->|REST| GW
TSDB -->|JDBC| GW
KAFKA -->|Consumer| GW
GW -->|REST / webhook| MES
GW -->|REST / IDoc| ERP
GW -->|REST| CMMS
GW -->|JDBC / Bulk insert| DWH
DWH -->|SQL| BI Exemple d'API REST — endpoint état équipement¶
# FastAPI — exposition de l'état d'un équipement IoT vers le MES
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from datetime import datetime
import httpx
app = FastAPI(title="IoT Integration API", version="1.0")
class EquipmentStatus(BaseModel):
device_id: str
state: str
bearing_temp: float | None
flow_rate: float | None
last_updated: datetime
predicted_failure_date: datetime | None
@app.get("/api/v1/equipment/{device_id}/status",
response_model=EquipmentStatus,
summary="État courant d'un équipement")
async def get_equipment_status(
device_id: str,
ditto_client: httpx.AsyncClient = Depends(get_ditto_client)
):
# Récupérer l'état depuis Eclipse Ditto
resp = await ditto_client.get(
f"/api/2/things/industrial:{device_id}"
)
if resp.status_code == 404:
raise HTTPException(status_code=404, detail="Équipement inconnu")
thing = resp.json()
features = thing.get("features", {})
return EquipmentStatus(
device_id=device_id,
state=features.get("operationalState", {})
.get("properties", {})
.get("state", "UNKNOWN"),
bearing_temp=features.get("telemetry", {})
.get("properties", {})
.get("bearingTemperature"),
flow_rate=features.get("telemetry", {})
.get("properties", {})
.get("flowRate"),
last_updated=thing.get("_modified"),
predicted_failure_date=features.get("predictive", {})
.get("properties", {})
.get("predictedFailureDate")
)
gRPC pour les flux à haute fréquence¶
REST est adapté aux requêtes ponctuelles. Pour les flux continus (abonnement aux événements, streaming de séries), gRPC offre une latence et un débit bien supérieurs grâce au multiplexage HTTP/2.
// iot_integration.proto
syntax = "proto3";
service IotIntegration {
// Requête unique
rpc GetEquipmentStatus (EquipmentRequest) returns (EquipmentStatus);
// Stream serveur → client
rpc StreamTelemetry (TelemetryStreamRequest) returns (stream TelemetryPoint);
// Bidirectionnel : commandes + confirmations
rpc CommandStream (stream Command) returns (stream CommandResult);
}
message TelemetryStreamRequest {
repeated string device_ids = 1;
repeated string measures = 2;
int32 interval_ms = 3;
}
message TelemetryPoint {
string device_id = 1;
string measure = 2;
double value = 3;
int64 timestamp = 4; // Unix ms
}
Connecteurs ERP et MES¶
Intégration avec SAP via REST/OData¶
SAP S/4HANA expose une API OData (basée sur REST + format Atom/JSON) pour la plupart de ses modules.
# Créer automatiquement un ordre de maintenance SAP
# depuis une alerte IoT de défaillance imminente
import requests
SAP_BASE = "https://sap.example.com/sap/opu/odata/sap/PM_ORDER_SRV"
def create_maintenance_order(device_id: str, failure_desc: str, planned_date: str):
payload = {
"OrderType": "PM01", # Ordre de maintenance préventif
"FunctionalLocation": device_id,
"ShortText": f"Maintenance prédictive — {device_id}",
"LongText": failure_desc,
"BasicStartDate": f"/Date({planned_date})/",
"Priority": "2", # Urgent
"MainWorkCenter": "MAINT-ELEC"
}
resp = requests.post(
f"{SAP_BASE}/MaintenanceOrders",
json=payload,
auth=("svc_iot", SAP_SERVICE_PASSWORD),
headers={"Accept": "application/json"}
)
resp.raise_for_status()
return resp.json()["d"]["OrderId"]
Intégration MES — remontée de KPIs de production¶
# Kafka Consumer → MES via webhook
from confluent_kafka import Consumer
import requests, json
consumer = Consumer({
'bootstrap.servers': 'kafka1:9092,kafka2:9092',
'group.id': 'mes-integration',
'auto.offset.reset': 'latest'
})
consumer.subscribe(['iot.aggregated.production_kpis'])
MES_WEBHOOK = "https://mes.example.com/api/v2/production/realtime"
while True:
msg = consumer.poll(1.0)
if msg is None or msg.error():
continue
kpi = json.loads(msg.value())
# Transformer en format MES
mes_payload = {
"line": kpi["production_line"],
"timestamp": kpi["bucket"],
"actual_rate": kpi["actual_rate"],
"good_parts": kpi["good_parts"],
"scrap_parts": kpi["scrap_parts"],
"downtime_min": kpi["downtime_seconds"] / 60
}
requests.post(MES_WEBHOOK, json=mes_payload,
headers={"X-API-Key": MES_API_KEY})
Données IoT vers le datawarehouse¶
Le datawarehouse reçoit les données IoT agrégées pour l'analytique long terme. Deux patterns coexistent :
Pattern ETL — Kafka Connect vers ClickHouse¶
{
"name": "iot-clickhouse-sink",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "4",
"topics": "iot.aggregated.1hour",
"hostname": "clickhouse.example.com",
"port": "8443",
"ssl": "true",
"database": "iot_warehouse",
"table": "telemetry_hourly",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"insert.method": "insert",
"batch.size": "10000"
}
}
Pattern CDC — Change Data Capture depuis TimescaleDB¶
graph LR
TS["TimescaleDB<br/>(source OLTP)"] -->|"Debezium<br/>WAL streaming"| KF["Kafka<br/>(CDC topic)"] -->|"Kafka Connect<br/>Sink connector"| CH["ClickHouse<br/>(DWH OLAP)"] Le CDC (Change Data Capture) capture chaque INSERT/UPDATE/DELETE dans le WAL PostgreSQL via Debezium et les réplique en temps quasi réel vers le datawarehouse. Avantage : pas de requête polling sur la source, latence < 1 seconde.
Tableau interopérabilité des standards industriels¶
| Standard | Couche | Usage principal | Maturité |
|---|---|---|---|
| OPC-UA | Terrain → cloud | Interopérabilité automates, SCADA, MES | Très élevée |
| MQTT + Sparkplug B | Terrain → broker | IoT SCADA-like structuré | Élevée |
| ISA-95 (IEC 62264) | MES / ERP | Modèle de données production | Référence mondiale |
| IFC / STEP | Industrie construction | Modèles 3D équipements | Élevée |
| AAS (Asset Admin Shell) | Industrie 4.0 | Passeport numérique équipement | Émergente |
| FIWARE NGSI-LD | Smart cities / IoT | Context broker sémantique | Modérée |
| OpenTelemetry | Observabilité | Métriques, traces, logs uniformes | Élevée |
Ce qu'il faut retenir¶
- L'intégration SI est la dernière étape du pipeline mais souvent la plus complexe : chaque système a ses standards, ses APIs et ses contraintes de sécurité.
- REST couvre les requêtes ponctuelles, gRPC les flux continus, Kafka Connect les réplications volumineuses.
- Le CDC via Debezium est la solution la plus robuste pour alimenter un datawarehouse sans impacter la source.
Pour aller plus loin : Sécurité IoT — avant d'exposer ce pipeline au réseau, sécuriser chaque couche de l'architecture est indispensable.