
Betrugserkennung in Echtzeit
Ein praktischer Leitfaden zu Confluent Flink
Einführung in Confluent Flink
Für datengetriebene Unternehmen, die wettbewerbsfähig bleiben möchten, sind Echtzeit-Datenverarbeitung und -analyse unentbehrlich geworden.
Die Event-Streaming-Plattform Apache Kafka spielt, seit ihrer Einführung in der Branche im Jahr 2011, eine wichtige Rolle aufgrund ihrer Fähigkeit Hochdurchsatz, fehlertolerante und skalierbare Datenströme zu bewältigen.
Bei komplexeren ereignisgesteuerten Anwendungen, Echtzeit-Analysen und kontinuierlichen Datenpipelines hat sich Apache Flink als eine der bevorzugten Technologien herausgestellt Speziell für die Stateful-Stream-Verarbeitung entworfen, glänzt Flink bei der Handhabung zeitbasierter Operationen und Fensterung, was es besonders gut für Szenarien geeignet macht, die präzise Ereigniszeit-Handhabung und komplexe Zustandsverwaltung erfordern.
Die Kombination beider Technologien ergibt eine leistungsstarke Lösung für skalierbare Datenverarbeitungspipelines in Echtzeit, die sowohl Event-Streaming als auch komplexe Verarbeitung mit minimaler Latenz und hoher Zuverlässigkeit bewältigen.
Confluent Cloud, gegründet von den Schöpfern von Apache Kafka im Jahr 2014, basiert auf genau dieser Kombination. Mit Kafka als Kernkomponente bietet es nahtlose Integration mit Flink an und ergänzt es mit den Vorteilen des serverlosen Cloud-Computings.
Die jüngste Funktion von Confluent, die Integration von KI-Modellinferenz in Confluent Cloud für Apache Flink, verspricht ein Allround-Datenwerkzeug von Daten-Inferenz und -verarbeitung bis hin zur Datenanalyse mithilfe von ML-Modellen.
In diesem Blog-Beitrag ist unser Ziel, eine praxisnahe Anleitung für diejenigen zu bieten, die Confluent Flink ausprobieren möchten.
Hands-On: Unsere Erfahrung mit Confluent Flink
Wir haben versucht, eine Echtzeit Betrugserkennung mithilfe von Confluent Kafka, Flink und Flink AI Remote Model Inference zu simulieren. Als Datensatz haben wir Kaggle IEEE-CIS Fraud Detection Dataset gewählt.
Nachfolgend möchten wir unsere Erfahrung teilen und Ihnen Schritt-für-Schritt zeigen, wie man diesen Anwendungsfall implementieren kann. Sie finden unseren Code auf Github. Lassen Sie uns anfangen!
Schritt 1: Erstellung einer Umgebung und eines Cluster in Confluent Cloud
Als Erstes haben wir für unsere Anwendung eine neue Umgebung und einen Cluster auf der Plattform Confluent Cloud erstellt. Eine detaillierte Anleitung zur Verwaltung von Clustern in Confluent finden Sie im Artikel diesem.
Schritt 2: Erstellung einer Tabelle mit Schema
Da wir vorhaben Flink für Datenumwandlungen zu verwenden, müssen wir eine Flink-Tabelle in Confluent Cloud erstellen.
Flink-Tabellen sind Dynamische Tabellen, welche die Manipulation und Verarbeitung von Streaming-Daten erleichtern. Jede Flink-Tabelle in Confluent Cloud wird von einem Kafka-Topic unterstützt.
Wir können unsere Flink-Tabellen in Confluent's Stream Processing-Arbeitsbereichen definieren und transformieren. Klicken Sie auf "Stream Processing" im Seitennavigationsmenü, wählen Sie dann Ihre Umgebung und "Create workspace“ aus. Es ist wichtig zu beachten, dass der Flink-Compute-Pool dieselbe Cloud und dieselbe Region wie der Kafka-Cluster aufweisen muss. Andernfalls kann Flink nicht auf die in dem Kafka-Cluster gespeicherten Daten zugreifen.
Lassen Sie uns nun die Tabellen "Identities" und "Transactions" erstellen.
Später werden wir einen Interval Join (Zeitbereichsverbund) anwenden, um beide Tabellen zu verknüpfen. Derzeit erlaubt Confluent Flink keine Interval Joins auf veränderlichen Tabellen. Daher haben wir unsere Tabellen als append-only definiert. Dazu verwenden wir die Eigenschaft "Changelog", wobei wir uns auf die Tatsache verlassen, dass „ein Stream grundsätzlich das Änderungsprotokoll einer Tabelle ist“.


Schritt 3: Erstellung Servicekonto und API-Schlüssel
Unsere lokalen Kafka-Producer lesen die CSV-Dateien aus dem Kaggle-Dataset und schreiben jede Aufzeichnung in die Kafka-Topics, die unseren Flink-Tabellen unterliegen.
Damit das funktioniert, brauchen wir
- Ein Servicekonto mit API-Referenzen für jeden Producer
- Ein definiertes Avro-Schema, das unserer Tabellenstruktur entspricht
Um Referenzen zu erstellen, gehen Sie zum Cluster und klicken Sie „API Keys“ in der linken Seitenleiste und klick „Create key“. Erstellen Sie dann ein Servicekonto und vergeben Sie einen Namen. Ordnen Sie es zu den Transaktions-Topics hinzu:


Laden Sie die generierten Schlüssel hinunter und speichern Sie diesen. Wiederholen Sie das Vorgehen für das Identities-Topic.
Die letzten fehlenden Referenzen sind die API-Schlüssel für das Schemaregister.
Gehen Sie zu Environment. In der rechten Seitenleiste scrollen Sie nach unten zu Stream Governance API. Kopieren Sie den angezeigten Endpunkt.
Unter Credentials klicken Sie auf „Add key“ und im nächsten Fenster auf „Create key“:


Laden Sie die generierten Schlüssel herunter und speichern diese.
Schritt 4: Daten aufnehmen und verarbeiten
Wir haben zwei Producer-Skripte erstellt, welche die Confluent Kafka Python Library verwenden. Diese Skripte lesen die Transactions und Identities Datensätze und nehmen die Events in Kafka auf. Sie können die Skripte im GitHub Repositoy finden. Um sie lokal auszuführen, müssen Bootstrap Server URL, Topic-Referenzen und Schema-Register angepasst werden. Dann müssen beide Skripte parallel ausgeführt werden.
Wenn unsere Producer-Skripte Daten in unsere Kafka-Topics schreiben, ist der nächste Schritt beide Datensätze in einer neuen Tabelle zu verbinden. Da wir Streams zusammenführen, führen wir sie mittels Interval Join zusammen, um sicherzustellen, dass nur Events im relevanten Zeitfenster zusammengeführt werden. Im Gegensatz dazu würde ein Regular Join zu übermäßigem Speicherverbrauch führen, aufgrund eines sich ständig erweiternden Zustands.

Zusätzlich möchten wir einige neue Funktionen als Teil unserer Datenaufbereitung konstruieren. Wir extrahieren die Zeitmerkmale der Transaction und fügen sie der Tabelle als zusätzliche Spalten hinzu:

Jetzt sind wir bereit für den nächsten Schritt: Die Echtzeitanalyse unserer Streaming-Daten mithilfe von Fern-KI-Modell-Inferenz.
Schritt 5: Analyse der Daten mithilfe eines Remote-ML-Modells
Für unser Anwendungsfall haben wir ein einfaches Betrugserkennungsmodell auf dem oben erwähnten Kaggle IEEE-CIS Fraud Detection Dataset trainiert und in Azure Machine Learning bereitgestellt. Die von Confluent unterstützten KI-Modellanbieter (zum Zeitpunkt der Artikelerstellung) sind das AWS Bedrock und Sagemaker, Azure OpenAI und Azure ML, OpenAI und Google Cloud VertexAI.
Eine weitreichendere Erklärung, wie ein Modell in Azure ML eingesetzt werden kann, würde den Umfang dieses Artikels überschreiten. Sie finden die Dokumentation hier: Allerdings werden wir einige Details darüber abdecken, wie die Eingabedaten, die von Confluent Flink an Azure ML gesendet werden, geparst werden.
Wir verbinden den Endpunkt unseres Modells mithilfe von Confluent CLI. Wenn Sie diesen noch nicht eingerichtet habt, dann finden Sie hier eine Anleitung. Die Erstellung einer Verbindung zu einem Remote-Modell ist hier dokumentiert Confluent docs. So haben wir es gemacht:
Als erstes kopieren Sie den Namen des Endpunktes und die URL. Dann melden Sie sich bei Azure CLI an und erhaltet den API-Schlüssel Ihres Endpunktes:

Jetzt melden Sie sich bei Confluent CLI an und erstellen die Verbindung:

Als nächstes erstellen Sie ein Modellobjekt mithilfe von Flink. Sie müssen die Namen und Typen der Eingabefunktion angeben, den Ausgabenamen und -typen und einige wenige Optionen, die mit dem Modellanbieter verbunden sind:

Jetzt kann das Modell verwendet werden, um Vorhersagen über Ihre Eingabedaten-Stream zu machen:

Der Azure ML-Endpunkt erfordert ein Scoring-Skript, das das Laden des Modells, das Parsen der Eingabe, die Datenübergabe an das Modell und die Rückgabe der Vorhersage umfasst. Da Confluent Flink die Modelleingabe als JSON-Objekt sendet, wobei jeder Schlüssel einen Funktionsnamen und einen entsprechender Wert repräsentiert, muss das Scoring-Skript zunächst JSON parsen, bevor die Daten an das Modell übergeben werden. Das von uns verwendete AzureML-Scoring-Skript ist auf unserem GitHub verfügbar. Darüber hinaus enthält das Repository die Docker-Datei für die Azure ML-Umgebung und die ONNX-Modell-Datei des trainierten Modells.
Während dieser Artikel sich auf die Kernintegration konzentriert, ist es erwähnenswert, dass Flink AI auch eine Funktion für die Modellversionierung anbietet. Mehr Details findet ihr hier In der Confluent-Dokumentation.
Fazit
Confluent Cloud bietet eine umfassende Lösung für datenzentrierte Anwendungsfälle, wobei die neue Flink AI-Funktion ML-Modelle reibungslos in eine moderne Datenpipeline integriert sind. In der Vergangenheit haben wir ein ähnliches Projekt mit Terraform und Apache Flink auf einem in AWS gehosteten Kubernetes-Cluster bereitgestellt. Die Einrichtung eines produktionsbereiten Flink und Kafka-Clusters erforderte erheblichen Aufwand. Im Gegensatz dazu ist das Einrichten von Confluent Flink angenehm einfach und erfordert wesentlich weniger Arbeitsaufwand.
Wir denken, dass Confluent Flink das Potenzial hat, die Zukunft von Datenstreaming und Echtzeit-Analytics erheblich zu vereinfachen. Insgesamt ist Confluent Cloud eine leistungsstarke All-in-One-Datenplattform mit großem Potenzial für datenzentrierte Unternehmen. Wir sind sehr gespannt auf weitere Entwicklungen.