Qu'est-ce qu'Apache Airflow? Opérateur Python dans Apache Airflow

Contenu

Vue d'ensemble

  • Nous comprenons Python Operator dans Apache Airflow avec un exemple
  • Nous aborderons également le concept de Variables dans Apache Airflow

introduction

Apache Airflow est un outil indispensable pour les ingénieurs de données. Facilite la création et le suivi de tous vos flux de travail. Lorsque vous avez plusieurs workflows, il y a plus de chances que vous utilisiez les mêmes bases de données et les mêmes chemins de fichiers pour plusieurs workflows. L'utilisation de variables est l'un des moyens les plus efficaces pour définir de telles informations partagées entre différents workflows..

Nous couvrirons le concept de variables dans cet article et un exemple d'opérateur Python dans Apache Airflow.

copie-de-étincelle-9233890

Cet article est la suite de Data Engineering 101: Introduction à Apache Airflow, où nous couvrons les fonctionnalités et les composants des bases de données de flux d'air, les étapes d'installation et créer un DAG de base. Ensuite, si vous êtes un débutant complet dans Apache Airflow, Je vous recommande de lire cet article en premier.

Table des matières

  1. Qu'est-ce qu'Apache Airflow?
  2. Démarrer le flux d'air
  3. Opérateur Python dans Apache Airflow
  4. Quelles sont les variables dans Apache Airflow?

Qu'est-ce qu'Apache Airflow?

Flux d'air Apache est un moteur de workflow qui planifiera et exécutera facilement vos pipelines de données complexes. Cela garantira que chaque tâche de votre pipeline de données s'exécute dans le bon ordre et que chaque tâche reçoive les ressources nécessaires.

Il vous fournira une interface utilisateur incroyable pour surveiller et résoudre tous les problèmes pouvant survenir.

capture d

Démarrer le flux d'air

Nous avons déjà discuté des étapes d'installation dans l'article précédent de cette série..

Pour démarrer le serveur de flux d'air, ouvrez le terminal et exécutez la commande suivante. Le port par défaut est 8080 et si vous utilisez ce port pour autre chose, peut le changer.

serveur web airflow -p 8080

À présent, démarrez le planificateur de flux d'air à l'aide de la commande suivante dans un autre terminal. Il surveillera tous vos flux de travail et les activera comme assigné.

programmateur de flux d'air

À présent, assurez-vous d'avoir un nom de dossier dags dans le répertoire airflow où vous allez définir votre DAGS et ouvrir le navigateur web et aller ouvrir: http: // hôte local: 8080 / administrateur / et vous verrez quelque chose comme ça:

capture d

Opérateur Python dans Apache Airflow

Un opérateur décrit une tâche unique dans le workflow et les opérateurs nous fournissent, différents opérateurs, pour de nombreuses tâches différentes, par exemple BashOperator, Opérateur Python, Opérateur de messagerie, MySqlOperator, etc. Dans le dernier article, nous avons appris à utiliser le BashOperator pour obtenir des scores de cricket en direct et à ce sujet, nous verrons comment utiliser le Opérateur Python.

Regardons l'exemple suivant:

  1. Importer les bibliothèques

    Commençons par importer les bibliothèques dont nous avons besoin. Nous utiliserons le Opérateur Python cette fois.

  2. Définition des arguments du DAG

    Pour chacun des DAG, nous devons passer un dictionnaire d'arguments. Voici la description de certains des arguments que vous pouvez passer:

    • propriétaire: Le nom du propriétaire du workflow doit être alphanumérique et peut comporter des traits de soulignement, mais ne doit pas contenir d'espaces.
    • depend_on_past: Si chaque fois que vous exécutez votre workflow, les données dépendent de l'exécution précédente, marquer comme vrai; au contraire, marquez-le comme faux.
    • date de début: Date de début de votre workflow
    • E-mail: Votre identifiant de messagerie, afin que vous puissiez recevoir un e-mail chaque fois qu'une tâche échoue pour une raison quelconque.
    • retry_delay: Si une tâche échoue, alors combien de temps faut-il attendre pour réessayer.

  3. Définition de la fonction Python

    À présent, nous allons définir la fonction Python qui imprimera une chaîne à l'aide d'un argument et cette fonction sera ensuite utilisée par PythonOperator.

  4. Définition de DAG

    À présent, nous allons créer un objet DAG et passer le dag_id qui est le nom du DAG et assurez-vous que vous n'avez pas créé de DAG avec ce nom avant. Passez les arguments que nous avons définis ci-dessus et ajoutez une description et intervalle de temps qui exécutera le DAG après l'intervalle de temps spécifié

  5. Définition de la tâche

    Nous n'avons qu'une seule tâche pour notre flux de travail:

    1. impression: Aux devoirs, nous imprimerons "Apache Airflow est un must pour les ingénieurs de données" dans le terminal en utilisant la fonction python.

    Nous passerons le id_tâche Al Opérateur Python objet. Vous verrez ce nom dans les nœuds de la vue graphique de votre DAG. Passez le nom de la fonction Python à l'argument “Python_callable” vous voulez exécuter et les arguments que vous utilisez pour le paramètre “op_kwargs” comme dictionnaire et, finalement, l'objet DAG auquel vous souhaitez lier cette tâche.

  6. Exécuter le DAG

    À présent, cuando actualice su panneau de Airflow, vous verrez votre nouveau DAG répertorié.

    Cliquez sur le DAG et ouvrez la vue graphique et vous verrez quelque chose comme ceci. Chacune des étapes du flux de travail sera dans une boîte distincte. Dans ce flux de travail, Nous n'avons qu'une étape qui est d'imprimer. Exécutez le workflow et attendez que sa bordure devienne vert foncé, indiquant qu'il s'est terminé avec succès.

    capture d

    Clique sur le nœud “imprimer” pour plus de détails sur cette étape puis cliquez sur Logs et vous verrez le résultat comme ceci.

    capture d

Quelles sont les variables dans Apache Airflow?

Nous savons qu'Airflow peut être utilisé pour créer et gérer des workflows complexes. Nous pouvons exécuter plusieurs workflows en même temps. Existe la posibilidad de que la mayoría de sus flujos de trabajo estén usando la misma base de données o la misma ruta de archivo. À présent, si vous apportez des modifications telles que la modification du chemin du répertoire où utiliser les fichiers de sauvegarde ou la modification de la configuration des bases de données. Dans ce cas, vous ne voulez pas mettre à jour chacun des DAGS séparément.

Airflow apporte une solution à ce problème, vous pouvez créer des variables où vous pouvez stocker et récupérer des données au moment de l'exécution dans les multiples DAGS. Ensuite, si des changements majeurs surviennent, puede editar su variable y sus flujos de trabajo están listos para comenzar.

Comment créer des variables?

Ouvrez le panneau Airflow et cliquez sur le Gestion dans le menu du haut, puis cliquez sur Variables.

capture d

À présent, cliquez sur Créer pour créer une nouvelle variable et une fenêtre comme celle-ci s'ouvrira. Ajouter une clé et une valeur et soumettre. Ici, Je crée une variable avec le nom de clé comme Chemin de données et la valeur comme chemin de n'importe quel fichier texte aléatoire.

capture d

À présent, nous allons créer un DAG où nous trouverons le nombre de mots des données textuelles présentes dans ce fichier. Quand vous voulez utiliser des variables, vous devez les importer. Voyons comment faire cela:

Alors, nous allons définir la fonction que le chemin variable utilisera, lisez-le et calculez le nombre de mots.

Le reste des étapes sont les mêmes que précédemment, vous devez définir le DAG et les tâches et votre workflow est prêt à s'exécuter.

Vous pouvez voir les résultats dans le journal et maintenant si vous pouvez utiliser cette variable dans n'importe quel autre DAG et vous pouvez également la modifier quand vous le souhaitez et tous vos DAGS sont mis à jour.

capture d

Remarques finales

Dans cet article, nous avons compris comment utiliser l'opérateur Python dans Apache Airflow, des concepts comme le branchement et les variables, et comment les créer. Dans le prochain article, nous allons créer un projet d'apprentissage automatique et automatiser votre flux de travail à l'aide d'Apache Airflow.

Je vous recommande de consulter les ressources d'ingénierie de données suivantes pour améliorer vos connaissances:

Si vous avez des questions concernant cet article, Laissez-moi savoir dans la section commentaire ci-dessous.

Abonnez-vous à notre newsletter

Nous ne vous enverrons pas de courrier SPAM. Nous le détestons autant que vous.