Einfaches Design von Kotlin Flow

Flow von Grant Tarrant

In einer früheren "Cold Flows, Hot Channels" -Geschichte¹ habe ich kalte und heiße Datenströme definiert und einen Anwendungsfall für Kotlin Flows - kalte asynchrone Datenströme - gezeigt. Lassen Sie uns nun einen Blick hinter die Kulissen werfen, ihr Design untersuchen und sehen, wie eine Kombination aus Sprachfunktionen und einer Bibliothek eine leistungsstarke Abstraktion mit einfachem Design ermöglicht.

Ein Flow in Kotlin wird durch eine Schnittstelle² dargestellt:

Interface Flow  {
    Fun Collect aussetzen (Sammler: FlowCollector )
}

Alles, was zu einem Flow gehört, ist eine einzelne Sammelfunktion, die eine Instanz der FlowCollector-Schnittstelle mit einer einzelnen Emit-Methode akzeptiert:

Schnittstelle FlowCollector  {
    Fun-Emit aussetzen (Wert: T)
}

Ein Sendername sollte dem Leser von „Cold Flows, Hot Channels“ bekannt vorkommen. In der Tat habe ich hier ein Beispiel für die folgende Ablaufdefinition gezeigt:

Werte: Durchfluss  = Durchfluss {
    für (i in 1..10) {
        Verzögerung (100)
        emit (i) // <- emit wird hier aufgerufen
    }
}

Eine Signatur des Flow Builders verwendet auch eine FlowCollector-Schnittstelle als Empfänger³, sodass wir direkt vom Körper des entsprechenden Lambda aus senden können:

fun  flow (Block: suspend FlowCollector . () -> Unit): Flow 

Für eine einfache Verwendung eines Flusses, wenn der Fluss gesammelt wird, wie folgt:

ints.collect {println (it)} // dauert 1 Sekunde und gibt 10 Zoll aus

Was passiert, ist, dass eine Instanz von FlowCollector basierend auf dem Lambda erstellt wird, das an die Funktion {…} übergeben wird, und genau diese Instanz dann an den Flow-Body {…} übergeben wird.

Somit ist eine Wechselwirkung zwischen einem Strömungserzeuger und einem Strömungssammler die eines einfachen Funktionsaufrufs - eines Aufrufs einer Emissionsfunktion. Wenn wir diesen Funktionsaufruf mental einbinden, können wir sofort verstehen, was passiert, wenn wir diesen Code ausführen - das entspricht:

für (i in 1..10) {
    Verzögerung (100)
    println (i) // <- emit wurde hier aufgerufen
}

Betreiber

Ein Flowbuilder und ein Operator für Erfassungsterminals sind alles, was wir wissen müssen, um Operatoren zu schreiben, die Flows auf verschiedene Arten transformieren. Ein einfacher Kartenoperator, der eine bestimmte Transformation auf jeden ausgegebenen Wert anwendet, kann beispielsweise folgendermaßen implementiert werden:

fun  Flow  .map (Transformation: suspend (Wert: T) -> R) = flow {
    sammeln {ausgeben (transformieren (es))}
}

Mit diesem Operator können wir nun ints.map {it * it} ausführen, um einen Fluss mit Quadraten der ursprünglichen Ganzzahlen zu definieren. Elemente fließen weiterhin über Funktionsaufrufe vom Emitter zum Kollektor. Dazwischen gibt es einfach noch eine Funktion.

Tatsächlich definiert die Bibliothek kotlinx.coroutines bereits eine Karte und eine Vielzahl anderer universeller Operatoren als Erweiterungen für den Flow-Typ. Bei diesem Entwurf ist es wichtig, dass es recht einfach ist, domänenspezifische Operatoren zu definieren. Es gibt keinen Unterschied zwischen "eingebauten" und "benutzerdefinierten" Operatoren - alle Operatoren sind erstklassig.

Gegendruck

Rückstau in der Softwareentwicklung ist definiert als die Fähigkeit eines Datenkonsumenten, der mit eingehenden Daten nicht Schritt halten kann, ein Signal an den Datenproduzenten zu senden, um die Rate der Datenelemente zu verlangsamen.

Das Design herkömmlicher reaktiver Ströme beinhaltet einen Rückkanal, um bei Bedarf mehr Daten von den Herstellern anzufordern. Die Verwaltung dieses Anforderungsprotokolls führt selbst für einfache Bediener zu notorisch schwierigen Implementierungen. Wir sehen keine dieser Komplexitäten im Design von Kotlin-Flüssen oder in der Implementierung von Operatoren für diese, aber Kotlin-Flüsse unterstützen den Gegendruck. Woher?

Ein transparentes Gegendruckmanagement wird in Kotlin-Flüssen durch die Verwendung von Kotlin-Suspendierungsfunktionen erreicht. Möglicherweise haben Sie bemerkt, dass alle Funktionen und Funktionstypen in Kotlin Flow Design mit dem Modifikator suspend gekennzeichnet sind. Diese Funktionen können die Ausführung des Aufrufers unterbrechen, ohne einen Thread zu blockieren. Wenn der Kollektor des Flusses überfordert ist, kann er den Emitter einfach aussetzen und später wieder aufnehmen, wenn er bereit ist, weitere Elemente aufzunehmen.

Dies ist vergleichbar mit dem Gegendruckmanagement in traditionellen Thread-basierten synchronen Daten-Pipelines, bei denen ein langsamer Verbraucher automatisch einen Gegendruck auf den Produzenten ausübt, indem er den Thread des Produzenten blockiert. Das Unterbrechen von Funktionen führt über einen einzelnen Thread hinaus in den Bereich der asynchronen Programmierung, indem der Gegendruck über die Threads hinweg transparent verwaltet wird, ohne sie zu blockieren. Aber das soll in einer anderen Geschichte erzählt werden.

Weiterführende Literatur und Fußnoten

  1. ^ Kalte Flüsse, heiße Kanäle
  2. ^ Flow und verwandte Typen und Funktionen sind ab Version 1.2.1 der kotlinx.coroutines-Bibliothek noch in der Vorschau. Lesen Sie hier mehr.
  3. ^ Funktionstypen in Kotlin
  4. ^ Dies ist eine leichte Vereinfachung. Es werden keine zusätzlichen Überprüfungen berücksichtigt, um die Kontexterhaltung sicherzustellen, aber dieses Thema fällt nicht in den Geltungsbereich dieser Geschichte. Weitere Details im Ausführungskontext von Kotlin Flows.
  5. ^ Sie können diesen Code über Kotlin Playground hier ausführen.
  6. ^ Extension-orientiertes Design
  7. ^ Reaktive Ströme
  8. ^ Implementieren von Operatoren für [RxJava] 2.0
  9. ^ Fäden blockieren, Koroutinen aussetzen