O que é Apache Airflow? Operador Python no Apache Airflow

Conteúdo

Visão geral

  • Nós entendemos o Operador Python no Apache Airflow com um exemplo
  • Também discutiremos o conceito de variáveis ​​no Apache Airflow

Introdução

Apache Airflow é uma ferramenta indispensável para engenheiros de dados. Facilita a criação e o rastreamento de todos os seus fluxos de trabalho. Quando você tem vários fluxos de trabalho, há mais chances de você estar usando os mesmos bancos de dados e os mesmos caminhos de arquivo para vários fluxos de trabalho. O uso de variáveis ​​é uma das formas mais eficientes de definir essas informações compartilhadas entre diferentes fluxos de trabalho..

Abordaremos o conceito de variáveis ​​neste artigo e um exemplo de um operador Python no Apache Airflow.

copy-of-spark-9233890

Este artigo é uma continuação da Engenharia de Dados 101: Introdução ao Apache Airflow, onde cobrimos os recursos e componentes de bancos de dados de fluxo de ar, as etapas de instalação e criar um DAG básico. Então, se você é um iniciante no Apache Airflow, Eu recomendo que você leia esse artigo primeiro.

Tabela de conteúdo

  1. O que é Apache Airflow?
  2. Inicie o fluxo de ar
  3. Operador Python no Apache Airflow
  4. Quais são as variáveis ​​no Apache Airflow?

O que é Apache Airflow?

Fluxo de ar Apache é um mecanismo de fluxo de trabalho que agendará e executará facilmente seus pipelines de dados complexos. Isso irá garantir que cada tarefa em seu pipeline de dados seja executada na ordem correta e que cada tarefa obtenha os recursos necessários.

Ele fornecerá a você uma interface de usuário incrível para monitorar e corrigir quaisquer problemas que possam surgir.

screenshot-from-2020-11-13-19-54-11-1-4769331

Inicie o fluxo de ar

Já discutimos as etapas de instalação no artigo anterior desta série..

Para iniciar o servidor de fluxo de ar, abra o terminal e execute o seguinte comando. A porta padrão é 8080 e se você estiver usando essa porta para outra coisa, pode mudar isso.

airflow webserver -p 8080

Agora, inicie o programador de fluxo de ar usando o seguinte comando em um terminal diferente. Ele irá monitorar todos os seus fluxos de trabalho e ativá-los conforme atribuído.

programador de fluxo de ar

Agora, certifique-se de ter um nome de pasta dags no diretório airflow, onde você definirá seu DAGS, abrirá o navegador da web e abrirá: http: // localhost: 8080 / admin / e você verá algo assim:

screenshot-from-2020-11-17-12-41-56-1-9588960

Operador Python no Apache Airflow

Um operador descreve uma única tarefa no fluxo de trabalho e os operadores nos fornecem, operadores diferentes, para muitas tarefas diferentes, por exemplo BashOperator, PythonOperator, Operador de email, MySqlOperator, etc. No último artigo, aprendemos como usar o BashOperator para obter resultados de críquete ao vivo e neste, veremos como usar o PythonOperator.

Vamos dar uma olhada no seguinte exemplo:

  1. Importar as bibliotecas

    Vamos começar importando as bibliotecas de que precisamos. Nós vamos usar o PythonOperator desta vez.

  2. Definindo Argumentos DAG

    Para cada um dos DAGs, precisamos passar um dicionário de argumentos. Aqui está a descrição de alguns dos argumentos que você pode passar:

    • proprietário: O nome do proprietário do fluxo de trabalho deve ser alfanumérico e pode conter sublinhados, mas não deve conter espaços.
    • Depende_on_past: Se toda vez que você executar seu fluxo de trabalho, os dados dependem da execução anterior, marque como verdadeiro; pelo contrário, marque como falso.
    • data de início: Data de início do seu fluxo de trabalho
    • O email: Seu e-mail ID, para que você possa receber um e-mail sempre que alguma tarefa falhar por qualquer motivo.
    • retry_delay: Se alguma tarefa falhar, então, quanto tempo você deve esperar para tentar novamente.

  3. Definição de função Python

    Agora, iremos definir a função Python que irá imprimir uma string usando um argumento e esta função será então usada pelo PythonOperator.

  4. Definição de DAG

    Agora, vamos criar um objeto DAG e passar o dag_id que é o nome do DAG e certifique-se de não ter criado nenhum DAG com esse nome antes. Passe os argumentos que definimos acima e adicione uma descrição e intervalo de tempo que irá executar o DAG após o intervalo de tempo especificado

  5. Definição de tarefa

    Temos apenas uma tarefa para o nosso fluxo de trabalho:

    1. Imprimir: Na lição de casa, imprimiremos "Apache Airflow é indispensável para engenheiros de dados" no terminal usando a função python.

    Vamos passar o task_id al PythonOperator objeto. Você verá este nome nos nós da visualização do gráfico de seu DAG. Passe o nome da função Python para o argumento “Python_callable” que você deseja executar e os argumentos usados ​​para o parâmetro “op_kwargs” como um dicionário e, Finalmente, o objeto DAG ao qual você deseja vincular esta tarefa.

  6. Execute o DAG

    Agora, cuando actualice su painel de Airflow, você verá seu novo DAG listado.

    Clique no DAG e abra a visualização do gráfico e você verá algo assim. Cada uma das etapas do fluxo de trabalho estará em uma caixa separada. Neste fluxo de trabalho, Só temos uma etapa que é imprimir. Execute o fluxo de trabalho e espere até que sua borda fique verde escuro, indicando que foi concluído com sucesso.

    screenshot-from-2020-11-23-11-47-09-6630914

    Clique no “imprimir” para obter mais detalhes sobre esta etapa e clique em Logs e você verá o resultado como este.

    screenshot-from-2020-11-23-11-47-45-6122900

Quais são as variáveis ​​no Apache Airflow?

Sabemos que o Airflow pode ser usado para criar e gerenciar fluxos de trabalho complexos. Podemos executar vários fluxos de trabalho ao mesmo tempo. Existe la posibilidad de que la mayoría de sus flujos de trabajo estén usando la misma base de dados o la misma ruta de archivo. Agora, se você fizer alguma alteração, como alterar o caminho do diretório onde usar os arquivos salvos ou alterar a configuração dos bancos de dados. Nesse caso, você não deseja atualizar cada um dos DAGS separadamente.

O Airflow oferece uma solução para isso, você pode criar variáveis ​​onde pode armazenar e recuperar dados em tempo de execução nos vários DAGS. Então, se ocorrerem grandes mudanças, puede editar su variável y sus flujos de trabajo están listos para comenzar.

Como criar variáveis?

Abra o painel Airflow e clique no Administração no menu superior e clique em Variáveis.

screenshot-from-2020-11-23-17-04-28-9723624

Agora, Clique em Crio para criar uma nova variável e uma janela como esta irá abrir. Adicione chave e valor e envie. Aqui, Estou criando uma variável com o nome da chave, como Caminho de dados e valor como o caminho de qualquer arquivo de texto aleatório.

screenshot-from-2020-11-23-17-42-27-8035085

Agora, vamos criar um DAG onde encontraremos a contagem de palavras dos dados de texto presentes neste arquivo. Quando você quiser usar variáveis, você deve importá-los. Vamos ver como fazer isso:

Mais tarde, vamos definir a função que o caminho da variável usará, leia e calcule a contagem de palavras.

O resto das etapas são as mesmas que fizemos anteriormente, você precisa definir o DAG e as tarefas e seu fluxo de trabalho está pronto para ser executado.

Você pode ver os resultados no log e agora se você pode usar esta variável em qualquer outro DAG e também pode editá-lo sempre que quiser e todos os seus DAGS são atualizados.

screenshot-from-2020-11-23-17-28-42-1151304

Notas finais

Neste artigo, entendemos como usar o operador Python no Apache Airflow, conceitos como ramificação e variáveis, e como criá-los. No próximo artigo, vamos criar um projeto de aprendizado de máquina e automatizar seu fluxo de trabalho usando Apache Airflow.

Eu recomendo que você consulte os seguintes recursos de engenharia de dados para melhorar seu conhecimento:

Se você tiver alguma dúvida relacionada a este artigo, Me avise na seção de comentários abaixo.

Assine a nossa newsletter

Nós não enviaremos SPAM para você. Nós odiamos isso tanto quanto você.