Streaming-Estructurado de Spark | Strukturiertes Streaming mit Kafka unter Windows

Inhalt

[*]

Dieser Artikel wurde im Rahmen der Data Science Blogathon

Einführung

Eines der Hauptprobleme, mit denen jeder konfrontiert ist, wenn er zum ersten Mal strukturiertes Streaming ausprobiert, ist das Einrichten der Umgebung, die zum Streamen seiner Daten erforderlich ist.. Wir haben einige Online-Tutorials, wie wir das einrichten können. Die meisten von ihnen konzentrieren sich darauf, Sie aufzufordern, eine virtuelle Maschine und ein Ubuntu-Betriebssystem zu installieren und dann alle erforderlichen Dateien zu konfigurieren, indem Sie die Bash-Datei ändern. Das funktioniert gut, aber nicht für alle. Sobald wir eine virtuelle Maschine verwenden, Manchmal müssen wir möglicherweise lange warten, wenn wir Maschinen mit weniger Speicher haben. Der Prozess kann aufgrund von Problemen mit der Speicherverzögerung stecken bleiben. Dann, für eine bessere Möglichkeit und eine einfache Bedienung, Ich zeige Ihnen, wie wir strukturiertes Streaming in unserem Windows-Betriebssystem konfigurieren können.

Gebrauchte Werkzeuge

Für die Konfiguration verwenden wir folgende Tools:

1. Kafka (zur Datenübertragung, fungiert als Produzent)

2. Tierpfleger

3. Pyspark (um die übermittelten Daten zu generieren, handelt als Verbraucher)

4. Jupyter-Notizbuch (Code-Editor)

Umgebungsvariablen

Es ist wichtig zu beachten, dass hier, Ich habe alle Dateien in Laufwerk C hinzugefügt. Was ist mehr, der Name muss mit dem der Dateien übereinstimmen, die Sie online installieren.

Wir müssen die Umgebungsvariablen setzen, wenn wir diese Dateien installieren. Bitte beachten Sie diese Bilder während der Installation für eine problemlose Erfahrung.

Das letzte Bild ist der Pfad der Systemvariablen.

82302microsoftteams-image202-3658430
70460microsoftteams-image201-1082611
93834microsoftteams-image203-5310208

Erforderliche Dateien

https://drive.google.com/drive/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw?usp=teilen

Kafka-Installation

Der erste Schritt ist die Installation von Kafka auf unserem System. Dazu müssen wir auf diesen Link gehen:

https://dzone.com/articles/running-apache-kafka-on-windows-os

Wir müssen Java installieren 8 zunächst und setzen Sie Umgebungsvariablen. Sie können alle Anweisungen über den Link erhalten.

Sobald wir mit Java fertig sind, wir müssen einen Tierpfleger einsetzen. He agregado archivos de guardián del zoológico en Google Drive. Siéntase libre de usarlo o simplemente siga todas las instrucciones dadas en el enlace. Si instaló zookeeper correctamente y configuró la variable de entorno, puede ver este resultado cuando ejecute zkserver como administrador en el símbolo del sistema.

52671untitled-4210307

Dann, instale Kafka según las instrucciones del enlace y ejecútelo con el comando especificado.

.binwindowskafka-server-start.bat .configserver.properties

Una vez que todo esté configurado, Versuchen Sie, ein Thema zu erstellen und zu überprüfen, ob es richtig funktioniert. Wenn ja, Sie haben die Kafka-Installation abgeschlossen.

19053ohne Titel-4282813

Funkeninstallation

In diesem Schritt, instalamos Spark. Grundsätzlich, Sie können diesem Link folgen, um Spark auf Ihrem Windows-Rechner zu konfigurieren.

https://phoenixnap.com/kb/install-spark-on-windows-10

Während eines der Schritte, Es wird nach der Konfiguration der Winutils-Datei gefragt. Für Ihren Komfort, Ich habe die Datei dem Drive-Link hinzugefügt, den ich geteilt habe. In einem Ordner namens Hadoop. Legen Sie diesen Ordner einfach auf Ihrem Laufwerk C ab und legen Sie die Umgebungsvariable wie in den Bildern gezeigt fest. Ich empfehle Ihnen dringend, die Spark-Datei zu verwenden, die ich zu Google Drive hinzugefügt habe. Einer der Hauptgründe ist die Datenübertragung, die wir benötigen, um eine strukturierte Übertragungsumgebung manuell einzurichten. In unserem Fall, Ich habe alle notwendigen Dinge konfiguriert und die Dateien nach vielen Versuchen geändert. Falls Sie eine neue Konfiguration vornehmen möchten, mach es gerne. Wenn die Konfiguration nicht richtig funktioniert, Bei der Übertragung von Daten in pyspark erhalten wir einen solchen Fehler:

Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

Sobald wir eins sind mit Spark, Jetzt können wir die erforderlichen Daten aus einer CSV-Datei in einen Producer streamen und mit dem Kafka-Theme in einen Consumer holen. Ich arbeite hauptsächlich mit dem Jupiter-Notizbuch und, Daher, Ich habe ein Notizbuch für dieses Tutorial verwendet.

Zuerst in deinem Notizbuch, Sie müssen einige Bibliotheken installieren:

1. pip installiert pyspark

2. Pip-Installer Kafka

3. pip installieren py4j

Wie funktioniert strukturiertes Streaming mit Pyspark?

50842ohne Titel-4916248

Wir haben eine CSV-Datei mit Daten, die wir übertragen möchten. Fahren wir mit dem klassischen Iris-Dataset fort. Jetzt, wenn wir die Irisdaten übertragen wollen, wir müssen Kafka als Produzent verwenden. Kafka, wir erstellen ein Thema, an das wir die Iris-Daten übermitteln und der Verbraucher kann den Datenrahmen dieses Themas abrufen.

Das Folgende ist der Herstellercode zum Übertragen von Irisdaten:

import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
import time
import random
import numpy as np

# pip install kafka-python

KAFKA_TOPIC_NAME_CONS = "Thema"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'

if __name__ == "__hauptsächlich__":
    drucken("Kafka Producer Application Started ... ")

    kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
                                       value_serializer=lambda x: x.encode('utf-8'))
    
    filepath = "IRIS.csv"
    
    flower_df = pd.read_csv(Dateipfad)
  
    flower_df['order_id'] = np.arange(len(flower_df))

    
    flower_list = flower_df.to_dict(orient="records")
       

    message_list = []
    message = None
    for message in flower_list:
        
        message_fields_value_list = []
               
        message_fields_value_list.append(Botschaft["order_id"])
        message_fields_value_list.append(Botschaft["sepal_length"])
        message_fields_value_list.append(Botschaft["sepal_width"])
        message_fields_value_list.append(Botschaft["Blütenblattlänge"])
        message_fields_value_list.append(Botschaft["petal_width"])
        message_fields_value_list.append(Botschaft["Spezies"])

        Nachricht=",".beitreten(str(v) for v in message_fields_value_list)
        drucken("Message Type: ", Typ(Botschaft))
        drucken("Nachricht: ", Botschaft)
        kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, Botschaft)
        zeit.schlaf(1)


    drucken("Kafka Producer Application Completed. ")

Para iniciar el productor, tenemos que ejecutar zkserver como administrador en el símbolo del sistema de Windows y luego iniciar Kafka usando: .binwindowskafka-server-start.bat .configserver.properties desde el símbolo del sistema en el directorio de Kafka. Si obtiene un error deno broker”, significa que Kafka no se está ejecutando correctamente.

El resultado después de ejecutar este código en el cuaderno jupyter se ve así:

17639untitled-2051986

Jetzt, revisemos al consumidor. Ejecute el siguiente código para ver si funciona bien en un nuevo portátil.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

import time

kafka_topic_name = "Thema"
kafka_bootstrap_servers="localhost:9092"

spark = SparkSession 
        .builder 
        .appName("Structured Streaming ") 
        .Meister("lokal[*]") 
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Construct a streaming DataFrame that reads from topic
flower_df = spark 
        .readStream 
        .format("kafka") 
        .Möglichkeit("kafka.bootstrap.servers", kafka_bootstrap_servers) 
        .Möglichkeit("subscribe", kafka_topic_name) 
        .Möglichkeit("startingOffsets", "neueste") 
        .Belastung()

flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "Zeitstempel")


flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING"



flower_df2 = flower_df1 
        .select(from_csv(col("Wert"), flower_schema_string) 
                .alias("flower"), "Zeitstempel")


flower_df3 = flower_df2.select("flower.*", "Zeitstempel")

    
flower_df3.createOrReplaceTempView("flower_find");
song_find_text = spark.sql("AUSWÄHLEN * FROM flower_find")
flower_agg_write_stream = song_find_text 
        .writeStream 
        .trigger(processingTime="5 Sekunden") 
        .outputMode("anhängen") 
        .Möglichkeit("truncate", "false") 
        .Format("Erinnerung") 
        .queryName("testedTable") 
        .Anfang()

flower_agg_write_stream.awaitTermination(1)

Una vez que ejecute esto, debería obtener un resultado como este:

98478untitled-5029166

Wie du siehst, habe ein paar Anfragen gestellt und geprüft, ob die Daten übertragen wurden. Das erste Konto war 5 Ja, nach ein paar sekunden, das Konto wurde erhöht auf 14, Bestätigung, dass Daten übertragen werden.

Hier, Grundsätzlich, Die Idee ist, einen Funkenkontext zu schaffen. Wir erhalten die Daten, indem wir Kafka zu unserem Thema auf dem angegebenen Port übermitteln. Spark-Sitzung kann mit getOrCreate erstellt werden () wie im Code gezeigt. Im nächsten Schritt wird der Kafka-Stream gelesen und die Daten können mit load . geladen werden (). Da die Daten übertragen werden, es wäre nützlich, einen Zeitstempel zu haben, in dem jeder der Datensätze angekommen ist. Wir spezifizieren das Schema wie in unserem SQL und erstellen abschließend einen Datenrahmen mit den Werten der übertragenen Daten mit seinem Zeitstempel. Schließlich, mit einer Bearbeitungszeit von 5 Sekunden, Wir können Daten in Stapeln empfangen. Wir verwenden SQL View, um Daten im angehängten Modus vorübergehend im Speicher zu speichern, und wir können alle Operationen mit unserem Spark-Datenrahmen ausführen.

Den vollständigen Code finden Sie hier:

https://github.com/Siddharth1698/Structured-Streaming-Tutorial

Dies ist eines meiner Spark-Streaming-Projekte, Sie können sich für detailliertere Abfragen und die Verwendung von maschinellem Lernen in Spark darauf beziehen:

https://github.com/Siddharth1698/Spotify-Recommendation-System-using-Pyspark-and-Kafka

Verweise

1. https://github.com/Siddharth1698/Structured-Streaming-Tutorial

2. https://dzone.com/articles/running-apache-kafka-on-windows-os

3. https://phoenixnap.com/kb/install-spark-on-windows-10

4. https://drive.google.com/drive/u/0/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw

5. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

6. Vorschaubild -> https://unsplash.com/photos/ImcUkZ72oUs

Fazit

Wenn Sie diese Schritte befolgen, Sie können alle Umgebungen einfach konfigurieren und Ihr erstes strukturiertes Streaming-Programm mit Spark und Kafka . ausführen. Bei Schwierigkeiten bei der Einrichtung, Zögere nicht mich zu kontaktieren:

[E-Mail geschützt]

https://www.linkedin.com/in/siddharth-m-426a9614a/

Die in diesem Artikel gezeigten Medien sind nicht Eigentum von DataPeaker und werden nach Ermessen des Autors verwendet.

Abonniere unseren Newsletter

Wir senden Ihnen keine SPAM-Mail. Wir hassen es genauso wie du.