Aller au contenu

Traitement et règles

Le stream processing transforme un flux brut de capteurs en décisions opérationnelles — il détecte les anomalies, agrège les KPIs et déclenche les alertes avant même que la donnée touche une base de données.


Pourquoi le traitement en flux ?

Le traitement par lots (batch) était la norme des datawarehouses traditionnels : collecter les données, les stocker, puis les analyser la nuit. Pour l'IoT industriel, ce modèle est inadapté :

  • Une surchauffe détectée 6 heures après les faits ne sert à rien.
  • Un pic de pression doit déclencher une alerte en moins de 5 secondes.
  • Les KPIs de production (OEE, taux de rebut) doivent être visibles en temps réel.

Le stream processing traite chaque événement au moment où il arrive dans le pipeline, avant même son écriture en base.


Kafka Streams

Kafka Streams est une bibliothèque Java/Kotlin embarquée dans l'application. Pas de cluster séparé à déployer : le traitement tourne dans le même processus que le producteur/consommateur.

Avantages pour l'IoT :

  • Déploiement simplifié (jar autonome)
  • State stores locaux pour les agrégations sans base externe
  • Exactement une fois (exactly-once semantics) avec Kafka transactionnel
  • Montée en charge horizontale par ajout de partitions

Exemple : agrégation glissante avec détection de seuil

StreamsBuilder builder = new StreamsBuilder();

KStream<String, TelemetryEvent> raw = builder
    .stream("iot.telemetry",
            Consumed.with(Serdes.String(), telemetrySerde));

// Moyenne glissante sur 5 minutes par device
KTable<Windowed<String>, Double> avgTemp = raw
    .filter((deviceId, evt) -> evt.getMeasure().equals("temperature"))
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .aggregate(
        AggregateAccumulator::new,
        (key, value, acc) -> acc.add(value.getValue()),
        Materialized.<String, AggregateAccumulator, WindowStore<Bytes, byte[]>>
            as("temp-avg-store")
            .withValueSerde(accumulatorSerde)
    )
    .mapValues(AggregateAccumulator::mean);

// Alertes : moyenne > 85°C
avgTemp
    .toStream()
    .filter((window, avg) -> avg != null && avg > 85.0)
    .map((window, avg) -> KeyValue.pair(
        window.key(),
        new Alert(window.key(), "TEMP_HIGH", avg, window.window().startTime())
    ))
    .to("iot.events.alerts", Produced.with(Serdes.String(), alertSerde));

Flink est le moteur de stream processing de référence pour les workloads industriels complexes. Il introduit des abstractions plus puissantes que Kafka Streams : event time vs processing time, watermarks, side outputs, et un scheduler distribué.

Event time vs Processing time

Un capteur embarqué peut envoyer ses mesures avec un horodatage interne (event time) différent de l'heure de réception sur le broker (processing time). Pour les calculs de fenêtres, event time est le seul temps physiquement correct.

// DataStream avec watermark sur le timestamp du message
DataStream<TelemetryEvent> stream = env
    .fromSource(kafkaSource, WatermarkStrategy
        .<TelemetryEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
        .withTimestampAssigner((evt, ts) -> evt.getTimestampMs()),
        "Kafka IoT Source");

// Fenêtre glissante 10 min / avance 1 min sur event time
stream
    .keyBy(TelemetryEvent::getDeviceId)
    .window(SlidingEventTimeWindows.of(
        Time.minutes(10), Time.minutes(1)))
    .aggregate(new TemperatureAggregator())
    .addSink(influxSink);

Watermarks et messages tardifs

Les watermarks indiquent à Flink jusqu'où il peut supposer que tous les événements d'une fenêtre sont arrivés. Les messages qui arrivent après le watermark sont considérés comme "late".

gantt
    title Fenêtre Flink — event time vs processing time
    dateFormat HH:mm:ss
    axisFormat %H:%M:%S

    section Événements reçus
    evt A (t=08:00:01) : done, 08:00:02, 1s
    evt B (t=08:00:03) : done, 08:00:04, 1s
    evt C retardé (t=07:59:55) : crit, 08:00:07, 1s

    section Watermark
    Watermark avance à 08:00:00 : milestone, 08:00:06, 0s

    section Fenêtre 07:59-08:00
    Fermeture fenêtre : milestone, 08:00:06, 0s

CEP — Complex Event Processing

Le CEP détecte des patterns temporels complexes sur plusieurs événements. Il va au-delà d'un simple seuil : il reconnaît des séquences.

Exemples de patterns industriels :

  • Température > 80°C suivi de pression > 6 bar dans les 30 secondes → risque d'emballement
  • Trois pannes capteur sur le même équipement en moins de 10 minutes → défaillance imminente
  • Débit anormalement bas pendant plus de 5 minutes alors que la pompe est signalée active → fuite probable
// Pattern : température élevée suivie d'une alarme vibration dans 60s
Pattern<TelemetryEvent, ?> dangerPattern = Pattern
    .<TelemetryEvent>begin("high_temp")
        .where(evt -> evt.getType().equals("temperature") && evt.getValue() > 80)
    .followedBy("high_vibration")
        .where(evt -> evt.getType().equals("vibration") && evt.getValue() > 12)
    .within(Time.seconds(60));

PatternStream<TelemetryEvent> patternStream =
    CEP.pattern(keyedStream, dangerPattern);

patternStream.select(matches -> {
    TelemetryEvent temp = matches.get("high_temp").get(0);
    TelemetryEvent vib  = matches.get("high_vibration").get(0);
    return new CriticalAlert(temp.getDeviceId(), "THERMAL_VIBRATION_COMBO",
                             temp.getValue(), vib.getValue());
}).addSink(alertSink);

Pipeline de traitement complet

flowchart TD
    subgraph "Kafka Topics"
        KT[iot.telemetry\n10M msg/jour]
        KA[iot.events.alerts]
        KAGG[iot.aggregated.1min]
    end

    subgraph "Flink Job"
        PARSE[Parsing\n+ validation schéma]
        ENRICH[Enrichissement\nmétadonnées device]
        WINDOW[Fenêtres glissantes\nevent time + watermarks]
        CEP_EN[Moteur CEP\npatterns multi-événements]
        THRESH[Détection seuils\nsimples]
        AGG[Agrégats\nmin/max/avg/stddev]
    end

    subgraph "Sorties"
        TSDB[(InfluxDB\nséries agrégées)]
        ALERT_SVC[Service alertes\nPagerDuty / Webhook]
        NOTIF[Notifications\nEmail / SMS / Teams]
        DWH[(Datawarehouse\nClickHouse / BigQuery)]
    end

    KT --> PARSE
    PARSE --> ENRICH
    ENRICH --> WINDOW
    ENRICH --> CEP_EN
    ENRICH --> THRESH
    WINDOW --> AGG
    AGG --> KAGG
    AGG --> TSDB
    CEP_EN --> KA
    THRESH --> KA
    KA --> ALERT_SVC
    ALERT_SVC --> NOTIF
    KAGG --> DWH

Moteur de règles métier

Pour les règles qui changent fréquemment (seuils d'alerte, paramètres process), coder la logique en dur dans un job Flink est contre-productif. Un moteur de règles externalisé permet de modifier les seuils sans redéploiement.

Architecture avec règles externalisées

graph TD
    DB["Base de règles (PostgreSQL)<br/>device_id | measure | condition | threshold | action<br/>sensor-042 | temperature | greater_than | 85.0 | ALERT<br/>* | pressure | greater_than | 8.5 | STOP"]
    DB -->|broadcast state| FLINK["Flink RuleEngine<br/>BroadcastStream"]
    TEL["iot.telemetry"] --> FLINK
    FLINK --> ALERTS["iot.events.alerts"]

Flink BroadcastStream permet de diffuser les règles mises à jour vers tous les opérateurs en parallèle sans redémarrer le job.


Métriques clés à surveiller

Métrique Seuil d'alerte typique Outil
Consumer group lag (Kafka) > 10 000 messages Kafka Exporter + Grafana
Processing latency (Flink) > 500 ms (p99) Flink Web UI / Prometheus
Checkpoint duration (Flink) > 30 s Flink metrics
Taux d'erreurs parsing > 0,1 % Custom counter
Débit events/s < 80 % du nominal Prometheus + alerting

Ce qu'il faut retenir

  • Kafka Streams convient aux transformations simples, Flink aux pipelines complexes avec CEP et event time.
  • Le CEP permet de détecter des séquences d'événements impossibles à identifier avec des seuils simples.
  • Les règles métier doivent être externalisées pour permettre les modifications sans redéploiement.

Chapitre suivant : Jumeaux numériques — synchroniser un modèle virtuel avec les équipements physiques pour simuler et anticiper.