[*]
Dieser Artikel wurde im Rahmen der Data Science Blogathon
Einführung
Eines der Hauptprobleme, mit denen jeder konfrontiert ist, wenn er zum ersten Mal strukturiertes Streaming ausprobiert, ist das Einrichten der Umgebung, die zum Streamen seiner Daten erforderlich ist.. Wir haben einige Online-Tutorials, wie wir das einrichten können. Die meisten von ihnen konzentrieren sich darauf, Sie aufzufordern, eine virtuelle Maschine und ein Ubuntu-Betriebssystem zu installieren und dann alle erforderlichen Dateien zu konfigurieren, indem Sie die Bash-Datei ändern. Das funktioniert gut, aber nicht für alle. Sobald wir eine virtuelle Maschine verwenden, Manchmal müssen wir möglicherweise lange warten, wenn wir Maschinen mit weniger Speicher haben. Der Prozess kann aufgrund von Problemen mit der Speicherverzögerung stecken bleiben. Dann, für eine bessere Möglichkeit und eine einfache Bedienung, Ich zeige Ihnen, wie wir strukturiertes Streaming in unserem Windows-Betriebssystem konfigurieren können.
Gebrauchte Werkzeuge
Für die Konfiguration verwenden wir folgende Tools:
1. Kafka (zur Datenübertragung, fungiert als Produzent)
2. Tierpfleger
3. Pyspark (um die übermittelten Daten zu generieren, handelt als Verbraucher)
4. Jupyter-Notizbuch (Code-Editor)
Umgebungsvariablen
Es ist wichtig zu beachten, dass hier, Ich habe alle Dateien in Laufwerk C hinzugefügt. Was ist mehr, der Name muss mit dem der Dateien übereinstimmen, die Sie online installieren.
Tenemos que configurar las variables de entorno a messenDas "messen" Es ist ein grundlegendes Konzept in verschiedenen Disziplinen, , die sich auf den Prozess der Quantifizierung von Eigenschaften oder Größen von Objekten bezieht, Phänomene oder Situationen. In Mathematik, Wird verwendet, um Längen zu bestimmen, Flächen und Volumina, In den Sozialwissenschaften kann es sich auf die Bewertung qualitativer und quantitativer Variablen beziehen. Die Messgenauigkeit ist entscheidend, um zuverlässige und valide Ergebnisse in der Forschung oder praktischen Anwendung zu erhalten.... que instalamos estos archivos. Bitte beachten Sie diese Bilder während der Installation für eine problemlose Erfahrung.
Das letzte Bild ist der Pfad der Systemvariablen.
Erforderliche Dateien
https://drive.google.com/drive/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw?usp=teilen
Kafka-Installation
Der erste Schritt ist die Installation von Kafka auf unserem System. Dazu müssen wir auf diesen Link gehen:
https://dzone.com/articles/running-apache-kafka-on-windows-os
Wir müssen Java installieren 8 zunächst und setzen Sie Umgebungsvariablen. Sie können alle Anweisungen über den Link erhalten.
Sobald wir mit Java fertig sind, wir müssen einen Tierpfleger einsetzen. He agregado archivos de guardián del zoológico en Google Drive. Siéntase libre de usarlo o simplemente siga todas las instrucciones dadas en el enlace. Si instaló zookeeper"Tierpfleger" ist ein Simulationsvideospiel, das im 2001, in der die Spieler in die Rolle eines Zoowärters schlüpfen. Die Hauptaufgabe besteht darin, verschiedene Tierarten zu verwalten und zu pflegen, Sicherstellung Ihres Wohlbefindens und der Zufriedenheit der Besucher. Während des gesamten Spiels, Benutzer können ihren Zoo entwerfen und anpassen, Herausforderungen wie, den Lebensraum und die Gesundheit von Tieren.... correctamente y configuró la VariableIn Statistik und Mathematik, ein "Variable" ist ein Symbol, das einen Wert darstellt, der sich ändern oder variieren kann. Es gibt verschiedene Arten von Variablen, und qualitativ, die nicht-numerische Eigenschaften beschreiben, und quantitative, numerische Größen darstellen. Variablen sind grundlegend in Experimenten und Studien, da sie die Analyse von Beziehungen und Mustern zwischen verschiedenen Elementen ermöglichen, das Verständnis komplexer Phänomene zu erleichtern.... der Umwelt, puede ver este resultado cuando ejecute zkserver como administrador en el símbolo del sistema.
Dann, instale Kafka según las instrucciones del enlace y ejecútelo con el comando especificado.
.binwindowskafka-server-start.bat .configserver.properties
Una vez que todo esté configurado, Versuchen Sie, ein Thema zu erstellen und zu überprüfen, ob es richtig funktioniert. Wenn ja, Sie haben die Kafka-Installation abgeschlossen.
Funkeninstallation
In diesem Schritt, instalamos Spark. Grundsätzlich, Sie können diesem Link folgen, um Spark auf Ihrem Windows-Rechner zu konfigurieren.
https://phoenixnap.com/kb/install-spark-on-windows-10
Während eines der Schritte, Es wird nach der Konfiguration der Winutils-Datei gefragt. Für Ihren Komfort, Ich habe die Datei dem Drive-Link hinzugefügt, den ich geteilt habe. In einem Ordner namens Hadoop. Legen Sie diesen Ordner einfach auf Ihrem Laufwerk C ab und legen Sie die Umgebungsvariable wie in den Bildern gezeigt fest. Ich empfehle Ihnen dringend, die Spark-Datei zu verwenden, die ich zu Google Drive hinzugefügt habe. Einer der Hauptgründe ist die Datenübertragung, die wir benötigen, um eine strukturierte Übertragungsumgebung manuell einzurichten. In unserem Fall, Ich habe alle notwendigen Dinge konfiguriert und die Dateien nach vielen Versuchen geändert. Falls Sie eine neue Konfiguration vornehmen möchten, mach es gerne. Wenn die Konfiguration nicht richtig funktioniert, Bei der Übertragung von Daten in pyspark erhalten wir einen solchen Fehler:
Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
Sobald wir eins sind mit Spark, Jetzt können wir die erforderlichen Daten aus einer CSV-Datei in einen Producer streamen und mit dem Kafka-Theme in einen Consumer holen. Ich arbeite hauptsächlich mit dem Jupiter-Notizbuch und, Daher, Ich habe ein Notizbuch für dieses Tutorial verwendet.
Zuerst in deinem Notizbuch, Sie müssen einige Bibliotheken installieren:
1. pip installiert pyspark
2. Pip-Installer Kafka
3. pip installieren py4j
Wie funktioniert strukturiertes Streaming mit Pyspark?
Wir haben eine CSV-Datei mit Daten, die wir übertragen möchten. Fahren wir mit dem klassischen Iris-Dataset fort. Jetzt, wenn wir die Irisdaten übertragen wollen, wir müssen Kafka als Produzent verwenden. Kafka, wir erstellen ein Thema, an das wir die Iris-Daten übermitteln und der Verbraucher kann den Datenrahmen dieses Themas abrufen.
Das Folgende ist der Herstellercode zum Übertragen von Irisdaten:
import pandas as pd from kafka import KafkaProducer from datetime import datetime import time import random import numpy as np # pip install kafka-python KAFKA_TOPIC_NAME_CONS = "Thema" KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092' if __name__ == "__hauptsächlich__": drucken("Kafka Producer Application Started ... ") kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS, value_serializer=lambda x: x.encode('utf-8')) filepath = "IRIS.csv" flower_df = pd.read_csv(Dateipfad) flower_df['order_id'] = np.arange(len(flower_df)) flower_list = flower_df.to_dict(orient="records") message_list = [] message = None for message in flower_list: message_fields_value_list = [] message_fields_value_list.append(Botschaft["order_id"]) message_fields_value_list.append(Botschaft["sepal_length"]) message_fields_value_list.append(Botschaft["sepal_width"]) message_fields_value_list.append(Botschaft["Blütenblattlänge"]) message_fields_value_list.append(Botschaft["petal_width"]) message_fields_value_list.append(Botschaft["Spezies"]) Nachricht=",".beitreten(str(v) for v in message_fields_value_list) drucken("Message Type: ", Typ(Botschaft)) drucken("Nachricht: ", Botschaft) kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, Botschaft) zeit.schlaf(1) drucken("Kafka Producer Application Completed. ")
Para iniciar el productor, tenemos que ejecutar zkserver como administrador en el símbolo del sistema de Windows y luego iniciar Kafka usando: .binwindowskafka-server-start.bat .configserver.properties desde el símbolo del sistema en el directorio de Kafka. Si obtiene un error de „no broker“, significa que Kafka no se está ejecutando correctamente.
El resultado después de ejecutar este código en el cuaderno jupyter se ve así:
Jetzt, revisemos al consumidor. Ejecute el siguiente código para ver si funciona bien en un nuevo portátil.
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.ml.feature import Normalizer, StandardScaler import random import time kafka_topic_name = "Thema" kafka_bootstrap_servers="localhost:9092" spark = SparkSession .builder .appName("Structured Streaming ") .Meister("lokal[*]") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") # Construct a streaming DataFrame that reads from topic flower_df = spark .readStream .format("kafka") .Möglichkeit("kafka.bootstrap.servers", kafka_bootstrap_servers) .Möglichkeit("subscribe", kafka_topic_name) .Möglichkeit("startingOffsets", "neueste") .Belastung() flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "Zeitstempel") flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING" flower_df2 = flower_df1 .select(from_csv(col("Wert"), flower_schema_string) .alias("flower"), "Zeitstempel") flower_df3 = flower_df2.select("flower.*", "Zeitstempel") flower_df3.createOrReplaceTempView("flower_find"); song_find_text = spark.sql("AUSWÄHLEN * FROM flower_find") flower_agg_write_stream = song_find_text .writeStream .trigger(processingTime="5 Sekunden") .outputMode("anhängen") .Möglichkeit("truncate", "false") .Format("Erinnerung") .queryName("testedTable") .Anfang() flower_agg_write_stream.awaitTermination(1)
Una vez que ejecute esto, debería obtener un resultado como este:
Wie du siehst, habe ein paar Anfragen gestellt und geprüft, ob die Daten übertragen wurden. Das erste Konto war 5 Ja, nach ein paar sekunden, das Konto wurde erhöht auf 14, Bestätigung, dass Daten übertragen werden.
Hier, Grundsätzlich, Die Idee ist, einen Funkenkontext zu schaffen. Wir erhalten die Daten, indem wir Kafka zu unserem Thema auf dem angegebenen Port übermitteln. Se puede crear una SitzungDas "Sitzung" Es ist ein Schlüsselbegriff im Bereich der Psychologie und Therapie. Bezieht sich auf ein geplantes Treffen zwischen einem Therapeuten und einem Klienten, wo Gedanken erforscht werden, Emotionen und Verhaltensweisen. Diese Sitzungen können in Länge und Häufigkeit variieren, und ihr Hauptzweck ist es, persönliches Wachstum und Problemlösung zu erleichtern. Die Wirksamkeit der Sitzungen hängt von der Beziehung zwischen dem Therapeuten und dem Therapeuten ab.. de chispa usando getOrCreate () wie im Code gezeigt. Im nächsten Schritt wird der Kafka-Stream gelesen und die Daten können mit load . geladen werden (). Da die Daten übertragen werden, es wäre nützlich, einen Zeitstempel zu haben, in dem jeder der Datensätze angekommen ist. Wir spezifizieren das Schema wie in unserem SQL und erstellen abschließend einen Datenrahmen mit den Werten der übertragenen Daten mit seinem Zeitstempel. Schließlich, mit einer Bearbeitungszeit von 5 Sekunden, Wir können Daten in Stapeln empfangen. Wir verwenden SQL View, um Daten im angehängten Modus vorübergehend im Speicher zu speichern, und wir können alle Operationen mit unserem Spark-Datenrahmen ausführen.
Den vollständigen Code finden Sie hier:
https://github.com/Siddharth1698/Structured-Streaming-Tutorial
Dies ist eines meiner Spark-Streaming-Projekte, Sie können sich für detailliertere Abfragen und die Verwendung von maschinellem Lernen in Spark darauf beziehen:
https://github.com/Siddharth1698/Spotify-Recommendation-System-using-Pyspark-and-Kafka
Verweise
1. https://github.com/Siddharth1698/Structured-Streaming-Tutorial
2. https://dzone.com/articles/running-apache-kafka-on-windows-os
3. https://phoenixnap.com/kb/install-spark-on-windows-10
4. https://drive.google.com/drive/u/0/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw
5. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
6. Vorschaubild -> https://unsplash.com/photos/ImcUkZ72oUs
Fazit
Wenn Sie diese Schritte befolgen, Sie können alle Umgebungen einfach konfigurieren und Ihr erstes strukturiertes Streaming-Programm mit Spark und Kafka . ausführen. Bei Schwierigkeiten bei der Einrichtung, Zögere nicht mich zu kontaktieren:
[E-Mail geschützt]
https://www.linkedin.com/in/siddharth-m-426a9614a/
Die in diesem Artikel gezeigten Medien sind nicht Eigentum von DataPeaker und werden nach Ermessen des Autors verwendet.