[*]
Dieser Artikel wurde im Rahmen der Data Science Blogathon
Einführung
Eines der Hauptprobleme, mit denen jeder konfrontiert ist, wenn er zum ersten Mal strukturiertes Streaming ausprobiert, ist das Einrichten der Umgebung, die zum Streamen seiner Daten erforderlich ist.. Wir haben einige Online-Tutorials, wie wir das einrichten können. Die meisten von ihnen konzentrieren sich darauf, Sie aufzufordern, eine virtuelle Maschine und ein Ubuntu-Betriebssystem zu installieren und dann alle erforderlichen Dateien zu konfigurieren, indem Sie die Bash-Datei ändern. Das funktioniert gut, aber nicht für alle. Sobald wir eine virtuelle Maschine verwenden, Manchmal müssen wir möglicherweise lange warten, wenn wir Maschinen mit weniger Speicher haben. Der Prozess kann aufgrund von Problemen mit der Speicherverzögerung stecken bleiben. Dann, für eine bessere Möglichkeit und eine einfache Bedienung, Ich zeige Ihnen, wie wir strukturiertes Streaming in unserem Windows-Betriebssystem konfigurieren können.
Gebrauchte Werkzeuge
Für die Konfiguration verwenden wir folgende Tools:
1. Kafka (zur Datenübertragung, fungiert als Produzent)
2. Tierpfleger
3. Pyspark (um die übermittelten Daten zu generieren, handelt als Verbraucher)
4. Jupyter-Notizbuch (Code-Editor)
Umgebungsvariablen
Es ist wichtig zu beachten, dass hier, Ich habe alle Dateien in Laufwerk C hinzugefügt. Was ist mehr, der Name muss mit dem der Dateien übereinstimmen, die Sie online installieren.
Wir müssen die Umgebungsvariablen setzen, wenn wir diese Dateien installieren. Bitte beachten Sie diese Bilder während der Installation für eine problemlose Erfahrung.
Das letzte Bild ist der Pfad der Systemvariablen.
Erforderliche Dateien
https://drive.google.com/drive/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw?usp=teilen
Kafka-Installation
Der erste Schritt ist die Installation von Kafka auf unserem System. Dazu müssen wir auf diesen Link gehen:
https://dzone.com/articles/running-apache-kafka-on-windows-os
Wir müssen Java installieren 8 zunächst und setzen Sie Umgebungsvariablen. Sie können alle Anweisungen über den Link erhalten.
Sobald wir mit Java fertig sind, wir müssen einen Tierpfleger einsetzen. He agregado archivos de guardián del zoológico en Google Drive. Siéntase libre de usarlo o simplemente siga todas las instrucciones dadas en el enlace. Si instaló zookeeper correctamente y configuró la variable de entorno, puede ver este resultado cuando ejecute zkserver como administrador en el símbolo del sistema.
Dann, instale Kafka según las instrucciones del enlace y ejecútelo con el comando especificado.
.binwindowskafka-server-start.bat .configserver.properties
Una vez que todo esté configurado, Versuchen Sie, ein Thema zu erstellen und zu überprüfen, ob es richtig funktioniert. Wenn ja, Sie haben die Kafka-Installation abgeschlossen.
Funkeninstallation
In diesem Schritt, instalamos Spark. Grundsätzlich, Sie können diesem Link folgen, um Spark auf Ihrem Windows-Rechner zu konfigurieren.
https://phoenixnap.com/kb/install-spark-on-windows-10
Während eines der Schritte, Es wird nach der Konfiguration der Winutils-Datei gefragt. Für Ihren Komfort, Ich habe die Datei dem Drive-Link hinzugefügt, den ich geteilt habe. In einem Ordner namens Hadoop. Legen Sie diesen Ordner einfach auf Ihrem Laufwerk C ab und legen Sie die Umgebungsvariable wie in den Bildern gezeigt fest. Ich empfehle Ihnen dringend, die Spark-Datei zu verwenden, die ich zu Google Drive hinzugefügt habe. Einer der Hauptgründe ist die Datenübertragung, die wir benötigen, um eine strukturierte Übertragungsumgebung manuell einzurichten. In unserem Fall, Ich habe alle notwendigen Dinge konfiguriert und die Dateien nach vielen Versuchen geändert. Falls Sie eine neue Konfiguration vornehmen möchten, mach es gerne. Wenn die Konfiguration nicht richtig funktioniert, Bei der Übertragung von Daten in pyspark erhalten wir einen solchen Fehler:
Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
Sobald wir eins sind mit Spark, Jetzt können wir die erforderlichen Daten aus einer CSV-Datei in einen Producer streamen und mit dem Kafka-Theme in einen Consumer holen. Ich arbeite hauptsächlich mit dem Jupiter-Notizbuch und, Daher, Ich habe ein Notizbuch für dieses Tutorial verwendet.
Zuerst in deinem Notizbuch, Sie müssen einige Bibliotheken installieren:
1. pip installiert pyspark
2. Pip-Installer Kafka
3. pip installieren py4j
Wie funktioniert strukturiertes Streaming mit Pyspark?
Wir haben eine CSV-Datei mit Daten, die wir übertragen möchten. Fahren wir mit dem klassischen Iris-Dataset fort. Jetzt, wenn wir die Irisdaten übertragen wollen, wir müssen Kafka als Produzent verwenden. Kafka, wir erstellen ein Thema, an das wir die Iris-Daten übermitteln und der Verbraucher kann den Datenrahmen dieses Themas abrufen.
Das Folgende ist der Herstellercode zum Übertragen von Irisdaten:
import pandas as pd from kafka import KafkaProducer from datetime import datetime import time import random import numpy as np # pip install kafka-python KAFKA_TOPIC_NAME_CONS = "Thema" KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092' if __name__ == "__hauptsächlich__": drucken("Kafka Producer Application Started ... ") kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS, value_serializer=lambda x: x.encode('utf-8')) filepath = "IRIS.csv" flower_df = pd.read_csv(Dateipfad) flower_df['order_id'] = np.arange(len(flower_df)) flower_list = flower_df.to_dict(orient="records") message_list = [] message = None for message in flower_list: message_fields_value_list = [] message_fields_value_list.append(Botschaft["order_id"]) message_fields_value_list.append(Botschaft["sepal_length"]) message_fields_value_list.append(Botschaft["sepal_width"]) message_fields_value_list.append(Botschaft["Blütenblattlänge"]) message_fields_value_list.append(Botschaft["petal_width"]) message_fields_value_list.append(Botschaft["Spezies"]) Nachricht=",".beitreten(str(v) for v in message_fields_value_list) drucken("Message Type: ", Typ(Botschaft)) drucken("Nachricht: ", Botschaft) kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, Botschaft) zeit.schlaf(1) drucken("Kafka Producer Application Completed. ")
Para iniciar el productor, tenemos que ejecutar zkserver como administrador en el símbolo del sistema de Windows y luego iniciar Kafka usando: .binwindowskafka-server-start.bat .configserver.properties desde el símbolo del sistema en el directorio de Kafka. Si obtiene un error de “no broker”, significa que Kafka no se está ejecutando correctamente.
El resultado después de ejecutar este código en el cuaderno jupyter se ve así:
Jetzt, revisemos al consumidor. Ejecute el siguiente código para ver si funciona bien en un nuevo portátil.
from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.ml.feature import Normalizer, StandardScaler import random import time kafka_topic_name = "Thema" kafka_bootstrap_servers="localhost:9092" spark = SparkSession .builder .appName("Structured Streaming ") .Meister("lokal[*]") .getOrCreate() spark.sparkContext.setLogLevel("ERROR") # Construct a streaming DataFrame that reads from topic flower_df = spark .readStream .format("kafka") .Möglichkeit("kafka.bootstrap.servers", kafka_bootstrap_servers) .Möglichkeit("subscribe", kafka_topic_name) .Möglichkeit("startingOffsets", "neueste") .Belastung() flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "Zeitstempel") flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING" flower_df2 = flower_df1 .select(from_csv(col("Wert"), flower_schema_string) .alias("flower"), "Zeitstempel") flower_df3 = flower_df2.select("flower.*", "Zeitstempel") flower_df3.createOrReplaceTempView("flower_find"); song_find_text = spark.sql("AUSWÄHLEN * FROM flower_find") flower_agg_write_stream = song_find_text .writeStream .trigger(processingTime="5 Sekunden") .outputMode("anhängen") .Möglichkeit("truncate", "false") .Format("Erinnerung") .queryName("testedTable") .Anfang() flower_agg_write_stream.awaitTermination(1)
Una vez que ejecute esto, debería obtener un resultado como este:
Wie du siehst, habe ein paar Anfragen gestellt und geprüft, ob die Daten übertragen wurden. Das erste Konto war 5 Ja, nach ein paar sekunden, das Konto wurde erhöht auf 14, Bestätigung, dass Daten übertragen werden.
Hier, Grundsätzlich, Die Idee ist, einen Funkenkontext zu schaffen. Wir erhalten die Daten, indem wir Kafka zu unserem Thema auf dem angegebenen Port übermitteln. Spark-Sitzung kann mit getOrCreate erstellt werden () wie im Code gezeigt. Im nächsten Schritt wird der Kafka-Stream gelesen und die Daten können mit load . geladen werden (). Da die Daten übertragen werden, es wäre nützlich, einen Zeitstempel zu haben, in dem jeder der Datensätze angekommen ist. Wir spezifizieren das Schema wie in unserem SQL und erstellen abschließend einen Datenrahmen mit den Werten der übertragenen Daten mit seinem Zeitstempel. Schließlich, mit einer Bearbeitungszeit von 5 Sekunden, Wir können Daten in Stapeln empfangen. Wir verwenden SQL View, um Daten im angehängten Modus vorübergehend im Speicher zu speichern, und wir können alle Operationen mit unserem Spark-Datenrahmen ausführen.
Den vollständigen Code finden Sie hier:
https://github.com/Siddharth1698/Structured-Streaming-Tutorial
Dies ist eines meiner Spark-Streaming-Projekte, Sie können sich für detailliertere Abfragen und die Verwendung von maschinellem Lernen in Spark darauf beziehen:
https://github.com/Siddharth1698/Spotify-Recommendation-System-using-Pyspark-and-Kafka
Verweise
1. https://github.com/Siddharth1698/Structured-Streaming-Tutorial
2. https://dzone.com/articles/running-apache-kafka-on-windows-os
3. https://phoenixnap.com/kb/install-spark-on-windows-10
4. https://drive.google.com/drive/u/0/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw
5. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
6. Vorschaubild -> https://unsplash.com/photos/ImcUkZ72oUs
Fazit
Wenn Sie diese Schritte befolgen, Sie können alle Umgebungen einfach konfigurieren und Ihr erstes strukturiertes Streaming-Programm mit Spark und Kafka . ausführen. Bei Schwierigkeiten bei der Einrichtung, Zögere nicht mich zu kontaktieren:
[E-Mail geschützt]
https://www.linkedin.com/in/siddharth-m-426a9614a/
Die in diesem Artikel gezeigten Medien sind nicht Eigentum von DataPeaker und werden nach Ermessen des Autors verwendet.
Verwandt
zusammenhängende Posts:
- NoSQL-Datenbanken, die jeder Data Scientist kennen sollte! 2020!
- Der magische Quadrant 2020 Gartner ist jetzt verfügbar! Sehen Sie sich die besten Analysetools an
- E-Books zum maschinellen Lernen für Datenwissenschaftler und Ingenieure für künstliche Intelligenz
- Was ist Kanalzuordnung?? Modellierung der Kanalzuordnung