Neu

msg digital mehr

Gradient gruen

Betrugserkennung in Echtzeit

Ein praktischer Leitfaden zu Confluent Flink

Einführung in Confluent Flink

Für datengetriebene Unternehmen, die wettbewerbsfähig bleiben möchten, sind Echtzeit-Datenverarbeitung und -analyse unentbehrlich geworden.

Die Event-Streaming-Plattform Apache Kafka spielt, seit ihrer Einführung in der Branche im Jahr 2011, eine wichtige Rolle aufgrund ihrer Fähigkeit Hochdurchsatz, fehlertolerante und skalierbare Datenströme zu bewältigen.

Bei komplexeren ereignisgesteuerten Anwendungen, Echtzeit-Analysen und kontinuierlichen Datenpipelines hat sich Apache Flink als eine der bevorzugten Technologien herausgestellt Speziell für die Stateful-Stream-Verarbeitung entworfen, glänzt Flink bei der Handhabung zeitbasierter Operationen und Fensterung, was es besonders gut für Szenarien geeignet macht, die präzise Ereigniszeit-Handhabung und komplexe Zustandsverwaltung erfordern.

Die Kombination beider Technologien ergibt eine leistungsstarke Lösung für skalierbare Datenverarbeitungspipelines in Echtzeit, die sowohl Event-Streaming als auch komplexe Verarbeitung mit minimaler Latenz und hoher Zuverlässigkeit bewältigen.

Confluent Cloud, gegründet von den Schöpfern von Apache Kafka im Jahr 2014, basiert auf genau dieser Kombination. Mit Kafka als Kernkomponente bietet es nahtlose Integration mit Flink an und ergänzt es mit den Vorteilen des serverlosen Cloud-Computings.

Die jüngste Funktion von Confluent, die Integration von KI-Modellinferenz in Confluent Cloud für Apache Flink, verspricht ein Allround-Datenwerkzeug von Daten-Inferenz und -verarbeitung bis hin zur Datenanalyse mithilfe von ML-Modellen.

In diesem Blog-Beitrag ist unser Ziel, eine praxisnahe Anleitung für diejenigen zu bieten, die Confluent Flink ausprobieren möchten.

 

Hands-On: Unsere Erfahrung mit Confluent Flink

Wir haben versucht, eine Echtzeit Betrugserkennung mithilfe von Confluent Kafka, Flink und Flink AI Remote Model Inference zu simulieren. Als Datensatz haben wir Kaggle IEEE-CIS Fraud Detection Dataset gewählt.

Nachfolgend möchten wir unsere Erfahrung teilen und Ihnen Schritt-für-Schritt zeigen, wie man diesen Anwendungsfall implementieren kann. Sie finden unseren Code auf Github. Lassen Sie uns anfangen!

Schritt 1: Erstellung einer Umgebung und eines Cluster in Confluent Cloud

Als Erstes haben wir für unsere Anwendung eine neue Umgebung und einen Cluster auf der Plattform Confluent Cloud erstellt. Eine detaillierte Anleitung zur Verwaltung von Clustern in Confluent finden Sie im Artikel diesem.
 

Schritt 2: Erstellung einer Tabelle mit Schema

Da wir vorhaben Flink für Datenumwandlungen zu verwenden, müssen wir eine Flink-Tabelle in Confluent Cloud erstellen.

Flink-Tabellen sind Dynamische Tabellen, welche die Manipulation und Verarbeitung von Streaming-Daten erleichtern. Jede Flink-Tabelle in Confluent Cloud wird von einem Kafka-Topic unterstützt.

Wir können unsere Flink-Tabellen in Confluent's Stream Processing-Arbeitsbereichen definieren und transformieren. Klicken Sie auf "Stream Processing" im Seitennavigationsmenü, wählen Sie dann Ihre Umgebung und "Create workspace“ aus. Es ist wichtig zu beachten, dass der Flink-Compute-Pool dieselbe Cloud und dieselbe Region wie der Kafka-Cluster aufweisen muss. Andernfalls kann Flink nicht auf die in dem Kafka-Cluster gespeicherten Daten zugreifen.

Lassen Sie uns nun die Tabellen "Identities" und "Transactions" erstellen.

Später werden wir einen Interval Join (Zeitbereichsverbund) anwenden, um beide Tabellen zu verknüpfen. Derzeit erlaubt Confluent Flink keine Interval Joins auf veränderlichen Tabellen. Daher haben wir unsere Tabellen als append-only definiert. Dazu verwenden wir die Eigenschaft "Changelog", wobei wir uns auf die Tatsache verlassen, dass „ein Stream grundsätzlich das Änderungsprotokoll einer Tabelle ist“.

Confluent Cloud UI showing the "streaming-demo" workspace with SQL code to create tables and a left-side navigator listing data catalogs like "marketplace" and "default".

 

 

Confluent Cloud workspace showing SQL code to create a table with multiple FLOAT and STRING fields, a timestamp, primary key, and changelog settings.

Schritt 3: Erstellung Servicekonto und API-Schlüssel

Unsere lokalen Kafka-Producer lesen die CSV-Dateien aus dem Kaggle-Dataset und schreiben jede Aufzeichnung in die Kafka-Topics, die unseren Flink-Tabellen unterliegen.

Damit das funktioniert, brauchen wir

  • Ein Servicekonto mit API-Referenzen für jeden Producer
  • Ein definiertes Avro-Schema, das unserer Tabellenstruktur entspricht

Um Referenzen zu erstellen, gehen Sie zum Cluster und klicken Sie „API Keys“ in der linken Seitenleiste und klick „Create key“. Erstellen Sie dann ein Servicekonto und vergeben Sie einen Namen. Ordnen Sie es zu den Transaktions-Topics hinzu:

Confluent Cloud UI showing the "Create key" screen with fields for service account name and description, and navigation steps for creating an API key.

 

 

Confluent Flink - Blog 4

Laden Sie die generierten Schlüssel hinunter und speichern Sie diesen. Wiederholen Sie das Vorgehen für das Identities-Topic.

Die letzten fehlenden Referenzen sind die API-Schlüssel für das Schemaregister.

Gehen Sie zu Environment. In der rechten Seitenleiste scrollen Sie nach unten zu Stream Governance API. Kopieren Sie den angezeigten Endpunkt.

Unter Credentials klicken Sie auf „Add key“ und im nächsten Fenster auf „Create key“:
 

Stream Governance API panel showing ID, endpoint URL with hidden parts, and a button labeled 'Add key' under the credentials section.

 

 

Create key interface showing access control configuration for a topic named 'transactions' with literal pattern, write operation, and allow permission.

Laden Sie die generierten Schlüssel herunter und speichern diese.

 

Schritt 4: Daten aufnehmen und verarbeiten

Wir haben zwei Producer-Skripte erstellt, welche die Confluent Kafka Python Library verwenden. Diese Skripte lesen die Transactions und Identities Datensätze und nehmen die Events in Kafka auf. Sie können die Skripte im GitHub Repositoy finden. Um sie lokal auszuführen, müssen Bootstrap Server URL, Topic-Referenzen und Schema-Register angepasst werden. Dann müssen beide Skripte parallel ausgeführt werden.

Wenn unsere Producer-Skripte Daten in unsere Kafka-Topics schreiben, ist der nächste Schritt beide Datensätze in einer neuen Tabelle zu verbinden. Da wir Streams zusammenführen, führen wir sie mittels Interval Join zusammen, um sicherzustellen, dass nur Events im relevanten Zeitfenster zusammengeführt werden. Im Gegensatz dazu würde ein Regular Join zu übermäßigem Speicherverbrauch führen, aufgrund eines sich ständig erweiternden Zustands.

SQL code creating the joined_records table using a LEFT JOIN between the transactions and identities tables, matching on TransactionID and a time window of ±60 seconds around the transaction timestamp.

Zusätzlich möchten wir einige neue Funktionen als Teil unserer Datenaufbereitung konstruieren. Wir extrahieren die Zeitmerkmale der Transaction und fügen sie der Tabelle als zusätzliche Spalten hinzu:

SQL code creating the transformed_records table with a WITH clause named transformations, which adds derived columns such as transactionDTDatetime, dayOfYear, dayOfWeek, and month based on the TransactionDT value.

Jetzt sind wir bereit für den nächsten Schritt: Die Echtzeitanalyse unserer Streaming-Daten mithilfe von Fern-KI-Modell-Inferenz.

 

Schritt 5: Analyse der Daten mithilfe eines Remote-ML-Modells

Für unser Anwendungsfall haben wir ein einfaches Betrugserkennungsmodell auf dem oben erwähnten Kaggle IEEE-CIS Fraud Detection Dataset trainiert und in Azure Machine Learning bereitgestellt. Die von Confluent unterstützten KI-Modellanbieter (zum Zeitpunkt der Artikelerstellung) sind das AWS Bedrock und Sagemaker, Azure OpenAI und Azure ML, OpenAI und Google Cloud VertexAI.

Eine weitreichendere Erklärung, wie ein Modell in Azure ML eingesetzt werden kann, würde den Umfang dieses Artikels überschreiten. Sie finden die Dokumentation hier: Allerdings werden wir einige Details darüber abdecken, wie die Eingabedaten, die von Confluent Flink an Azure ML gesendet werden, geparst werden.

Wir verbinden den Endpunkt unseres Modells mithilfe von Confluent CLI. Wenn Sie diesen noch nicht eingerichtet habt, dann finden Sie hier eine Anleitung. Die Erstellung einer Verbindung zu einem Remote-Modell ist hier dokumentiert Confluent docs. So haben wir es gemacht:

Als erstes kopieren Sie den Namen des Endpunktes und die URL. Dann melden Sie sich bei Azure CLI an und erhaltet den API-Schlüssel Ihres Endpunktes:

Screenshot of an Azure CLI command to retrieve credentials for an Azure ML online endpoint. The command uses parameters for endpoint name, resource group, and workspace name.

Jetzt melden Sie sich bei Confluent CLI an und erstellen die Verbindung:

Screenshot of a CLI command to create a Flink connection using Confluent CLI. The command includes parameters for cloud ("azure"), region ("westeurope"), type ("azureml"), endpoint URL, and API key.

Als nächstes erstellen Sie ein Modellobjekt mithilfe von Flink. Sie müssen die Namen und Typen der Eingabefunktion angeben, den Ausgabenamen und -typen und einige wenige Optionen, die mit dem Modellanbieter verbunden sind:

Screenshot of a SQL statement to create a machine learning model named frauddetection in the Flink environment. The model takes inputs like TransactionID, TransactionDT, TransactionAmt (FLOAT), DeviceType, and DeviceInfo (STRING), and outputs a fraud indicator (isFraud as INT). The model is configured to use AzureML for classification tasks.

Jetzt kann das Modell verwendet werden, um Vorhersagen über Ihre Eingabedaten-Stream zu machen:

Screenshot of a SQL query using ML_PREDICT in Apache Flink. The query selects all data from the transformed_records table in the flink-streamer schema and applies a machine learning prediction using the frauddetection model on TransactionID and other features (indicated by a comment to list all features).

 

Der Azure ML-Endpunkt erfordert ein Scoring-Skript, das das Laden des Modells, das Parsen der Eingabe, die Datenübergabe an das Modell und die Rückgabe der Vorhersage umfasst. Da Confluent Flink die Modelleingabe als JSON-Objekt sendet, wobei jeder Schlüssel einen Funktionsnamen und einen entsprechender Wert repräsentiert, muss das Scoring-Skript zunächst JSON parsen, bevor die Daten an das Modell übergeben werden. Das von uns verwendete AzureML-Scoring-Skript ist auf unserem GitHub verfügbar. Darüber hinaus enthält das Repository die Docker-Datei für die Azure ML-Umgebung und die ONNX-Modell-Datei des trainierten Modells.

Während dieser Artikel sich auf die Kernintegration konzentriert, ist es erwähnenswert, dass Flink AI auch eine Funktion für die Modellversionierung anbietet. Mehr Details findet ihr hier In der Confluent-Dokumentation.

Fazit

Confluent Cloud bietet eine umfassende Lösung für datenzentrierte Anwendungsfälle, wobei die neue Flink AI-Funktion ML-Modelle reibungslos in eine moderne Datenpipeline integriert sind. In der Vergangenheit haben wir ein ähnliches Projekt mit Terraform und Apache Flink auf einem in AWS gehosteten Kubernetes-Cluster bereitgestellt. Die Einrichtung eines produktionsbereiten Flink und Kafka-Clusters erforderte erheblichen Aufwand. Im Gegensatz dazu ist das Einrichten von Confluent Flink angenehm einfach und erfordert wesentlich weniger Arbeitsaufwand.

Wir denken, dass Confluent Flink das Potenzial hat, die Zukunft von Datenstreaming und Echtzeit-Analytics erheblich zu vereinfachen. Insgesamt ist Confluent Cloud eine leistungsstarke All-in-One-Datenplattform mit großem Potenzial für datenzentrierte Unternehmen. Wir sind sehr gespannt auf weitere Entwicklungen.

 

Haben Sie Fragen? Sprechen Sie uns an!

Schamberger, Tom edited

Tom Schamberger

Head of Cloud Data Platforms