Streaming estructurado de Spark | Streaming estruturado com Kafka no Windows

Conteúdo

[*]

Este artigo foi publicado como parte do Data Science Blogathon

Introdução

Um dos principais problemas que todos enfrentam ao tentar o streaming estruturado pela primeira vez é configurar o ambiente necessário para transmitir seus dados.. Temos alguns tutoriais online sobre como podemos configurar isso. A maioria deles se concentra em pedir que você instale uma máquina virtual e um sistema operacional ubuntu e, em seguida, configure todos os arquivos necessários alterando o arquivo bash. Isso funciona bem, mas não para todos. Uma vez que usamos uma máquina virtual, às vezes podemos ter que esperar muito tempo se tivermos máquinas com menos memória. O processo pode travar devido a problemas de atraso de memória. Então, para uma maneira melhor de fazer isso e fácil operação, Vou mostrar como podemos configurar o streaming estruturado em nosso sistema operacional Windows.

Ferramentas usadas

Para a configuração, usamos as seguintes ferramentas:

1. Kafka (para transmissão de dados, atua como produtor)

2. Funcionário do zoológico

3. Pyspark (para gerar os dados transmitidos, atua como um consumidor)

4. Notebook Jupyter (editor de código)

variáveis ​​ambientais

É importante notar que aqui, Eu adicionei todos os arquivos na unidade C. O que mais, o nome deve ser o mesmo dos arquivos que você instala online.

Temos que definir as variáveis ​​de ambiente à medida que instalamos esses arquivos. Consulte essas imagens durante a instalação para uma experiência descomplicada.

A última imagem é o caminho das variáveis ​​do sistema.

82302microsoftteams-image202-3658430
70460microsoftteams-image201-1082611
93834microsoftteams-image203-5310208

Arquivos necessários

https://drive.google.com/drive/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw?usp = compartilhamento

Instalação Kafka

O primeiro passo é instalar o Kafka em nosso sistema. Para isso temos que ir a este link:

https://dzone.com/articles/running-apache-kafka-on-windows-os

Precisamos instalar o Java 8 inicialmente e definir variáveis ​​de ambiente. Você pode obter todas as instruções no link.

Assim que terminarmos com o Java, devemos instalar um zelador. Eu adicionei arquivos de guardiões do zoológico no Google Drive. Sinta-se livre para usá-lo ou apenas siga todas as instruções dadas no link. Se você instalou o zookeeper corretamente e configurou a variável ambiente, você pode ver este resultado quando você executar zkserver como um administrador no prompt de comando.

52671sem título-4210307

A seguir, instalar Kafka de acordo com as instruções de link e executá-lo com o comando especificado.

.binwindowskafka-server-start.bat .configserver.properties

Uma vez que tudo esteja configurado, tente criar um tema e verifique se funciona corretamente. Sim é assim, você terá completado a instalação do Kafka.

19053sem título-4282813

Instalação do Spark

Nesta etapa, instalamos Spark. Basicamente, você pode seguir este link para configurar o Spark em sua máquina Windows.

https://phoenixnap.com/kb/install-spark-on-windows-10

Durante uma das etapas, irá pedir para o arquivo winutils ser configurado. Para seu conforto, Eu adicionei o arquivo no link do drive que compartilhei. Em uma pasta chamada Hadoop. Basta colocar essa pasta em seu drive C e definir a variável de ambiente como mostrado nas imagens. Recomendo enfaticamente que você use o arquivo Spark que adicionei ao Google Drive. Um dos principais motivos é a transmissão de dados de que precisamos para configurar manualmente um ambiente de transmissão estruturado. No nosso caso, Eu configurei todas as coisas necessárias e modifiquei os arquivos depois de tentar muito. Caso queira fazer uma nova configuração, sinta-se livre para fazer isso. Se a configuração não funcionar corretamente, acabamos com um erro como este ao transmitir dados no pyspark:

Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

Uma vez que somos um com o Spark, agora podemos transmitir os dados necessários de um arquivo CSV em um produtor e obtê-los em um consumidor usando o tema Kafka. Eu trabalho principalmente com o notebook Júpiter e, portanto, Eu usei um notebook para este tutorial.

Em seu caderno primeiro, você deve instalar algumas bibliotecas:

1. pip instala o pyspark

2. instalador pip Kafka

3. pip install py4j

Como funciona o streaming estruturado com Pyspark?

50842sem título-4916248

Temos um arquivo CSV que contém dados que queremos transmitir. Vamos prosseguir com o conjunto de dados Iris clássico. Agora, se quisermos transmitir os dados da íris, devemos usar Kafka como produtor. Kafka, criamos um tópico para o qual transmitimos os dados da íris e o consumidor pode recuperar o quadro de dados deste tópico.

A seguir está o código do produtor para transmitir os dados da íris:

import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
import time
import random
import numpy as np

# pip install kafka-python

KAFKA_TOPIC_NAME_CONS = "Tema"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'

if __name__ == "__a Principal__":
    imprimir("Aplicação do produtor Kafka iniciada ... ")

    kafka_producer_obj = Produtor kafka(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
                                       value_serializer=lambda x: x.encode('utf-8'))
    
    filepath = "IRIS.csv"
    
    flower_df = pd.read_csv(caminho de arquivo)
  
    flower_df['order_id'] = np.arange(len(flower_df))

    
    flower_list = flower_df.to_dict(oriente="Arquivo")
       

    message_list = []
    message = None
    for message in flower_list:
        
        message_fields_value_list = []
               
        message_fields_value_list.append(mensagem["order_id"])
        message_fields_value_list.append(mensagem["sepal_length"])
        message_fields_value_list.append(mensagem["sepal_width"])
        message_fields_value_list.append(mensagem["petal_length"])
        message_fields_value_list.append(mensagem["petal_width"])
        message_fields_value_list.append(mensagem["espécie"])

        mensagem =",".Junte(str(v) para v em message_fields_value_list)
        imprimir("Tipo de mensagem: ", modelo(mensagem))
        imprimir("Mensagem: ", mensagem)
        kafka_producer_obj.enviar(KAFKA_TOPIC_NAME_CONS, mensagem)
        hora de dormir(1)


    imprimir("Aplicação do produtor kafka concluída. ")

Para iniciar o produtor, temos que executar zkserver como administrador no prompt de comando do Windows e, em seguida, iniciar Kafka usando: .binwindowskafka-server-start.bat .configserver.properties from the command prompt in the Kafka directory. Se você obter um erro de “nenhum corretor”, significa que Kafka não está funcionando corretamente.

o resultado depois de executar este código no fichário jupyter se parece com este:

17639sem título-2051986

Agora, vamos rever o consumidor. Execute o código a seguir para ver se ele funciona bem em um novo laptop.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
de pyspark.ml.feature importação Normalizador, StandardScaler
import random

import time

kafka_topic_name = "Tema"
kafka_bootstrap_servers="localhost:9092"

spark = SparkSession 
        .builder 
        .appName("Streaming estruturado ") 
        .mestre("local[*]") 
        .getOrCreate()

spark.sparkContext.setLogLevel("ERRO")

# Construct a streaming DataFrame that reads from topic
flower_df = spark 
        .readStream 
        .format("Kafka") 
        .opção("kafka.bootstrap.servers", kafka_bootstrap_servers) 
        .opção("subscrever", kafka_topic_name) 
        .opção("startingOffsets", "Mais recentes") 
        .carga()

flower_df1 = flower_df.selectExpr("LANÇAR(valor AS STRING)", "timestamp")


flower_schema_string = "int order_id,sepal_length DUPLO,sepal_length DUPLO,sepal_length DUPLO,sepal_length DUPLO,string espécies"



flower_df2 = flower_df1 
        .select(from_csv(col("valor"), flower_schema_string) 
                .apelido("flor"), "timestamp")


flower_df3 = flower_df2.select("flor .*", "timestamp")

    
flower_df3.createOrReplaceTempView("flower_find");
song_find_text = faísca.sql("SELECIONE * DE flower_find")
flower_agg_write_stream = song_find_text 
        .writeStream 
        .trigger(processamentoTime="5 Segundos") 
        .outputMode("acrescentar") 
        .opção("truncar", "falso") 
        .formato("memória") 
        .consultaNo("testadoTable") 
        .começar()

flower_agg_write_stream.aguardaTerminação(1)

Una vez que ejecute esto, debería obtener un resultado como este:

98478sem título-5029166

Como você pode ver, fiz algumas investigações e verificou se os dados estavam sendo transmitidos. A primeira conta foi 5 e, depois de alguns segundos, a conta aumentou para 14, confirmando que os dados estão sendo transmitidos.

Aqui, basicamente, a ideia é criar um contexto de faísca. Obtemos os dados transmitindo Kafka sobre nosso assunto na porta especificada. A sessão do Spark pode ser criada usando getOrCreate () como mostrado no código. A próxima etapa inclui a leitura do fluxo Kafka e os dados podem ser carregados usando load (). Uma vez que os dados estão sendo transmitidos, seria útil ter um carimbo de data / hora em que cada um dos registros chegou. Especificamos o esquema como fazemos em nosso SQL e, finalmente, criamos um quadro de dados com os valores dos dados transmitidos com seu carimbo de data / hora. Finalmente, com um tempo de processamento de 5 segundos, podemos receber dados em lotes. Usamos o SQL View para armazenar dados temporariamente na memória no modo anexado e podemos realizar todas as operações nele usando nosso quadro de dados Spark.

Veja o código completo aqui:

https://github.com/Siddharth1698/Structured-Streaming-Tutorial

Este é um dos meus projetos de streaming do Spark, você pode consultá-lo para consultas mais detalhadas e uso de aprendizado de máquina no Spark:

https://github.com/Siddharth1698/Spotify-Recommendation-System-using-Pyspark-and-Kafka

Referências

1. https://github.com/Siddharth1698/Structured-Streaming-Tutorial

2. https://dzone.com/articles/running-apache-kafka-on-windows-os

3. https://phoenixnap.com/kb/install-spark-on-windows-10

4. https://drive.google.com/drive/u/0/folders/1kOQAKgo98cPPYcvqpygyqNFIGjrK_bjw

5. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

6. Imagem em miniatura -> https://unsplash.com/photos/ImcUkZ72oUs

conclusão

Se você seguir essas etapas, você pode configurar facilmente todos os ambientes e executar seu primeiro programa de streaming estruturado com Spark e Kafka. Em caso de dificuldade em configurá-lo, Não hesite em contactar-me:

[e-mail protegido]

https://www.linkedin.com/in/siddharth-m-426a9614a/

A mídia mostrada neste artigo não é propriedade da DataPeaker e é usada a critério do autor.

Assine a nossa newsletter

Nós não enviaremos SPAM para você. Nós odiamos isso tanto quanto você.