¿Qué es Apache Airflow? Operador de Python en Apache Airflow

Contenidos

Visión general

  • Entendemos Python Operator en Apache Airflow con un ejemplo
  • También discutiremos el concepto de Variables en Apache Airflow

Introducción

Apache Airflow es una herramienta imprescindible para los ingenieros de datos. Facilita la creación y el seguimiento de todos sus flujos de trabajo. Cuando tiene varios flujos de trabajo, hay más posibilidades de que esté utilizando las mismas bases de datos y las mismas rutas de archivo para varios flujos de trabajo. El uso de variables es una de las formas más eficientes de definir dicha información compartida entre diferentes flujos de trabajo.

Cubriremos el concepto de variables en este artículo y un ejemplo de un operador de Python en Apache Airflow.

copy-of-spark-9233890

Este artículo es una continuación de Ingeniería de datos 101: Introducción a Apache Airflow, donde cubrimos las características y componentes de las bases de datos de flujo de aire, los pasos de instalación y creamos un DAG básico. Entonces, si es un principiante completo en Apache Airflow, le recomendaría que lea ese artículo primero.

Tabla de contenido

  1. ¿Qué es Apache Airflow?
  2. Iniciar el flujo de aire
  3. Operador de Python en Apache Airflow
  4. ¿Cuáles son las variables en Apache Airflow?

¿Qué es Apache Airflow?

Flujo de aire Apache es un motor de flujo de trabajo que programará y ejecutará fácilmente sus complejas canalizaciones de datos. Se asegurará de que cada tarea de su canalización de datos se ejecute en el orden correcto y que cada tarea obtenga los recursos necesarios.

Le proporcionará una interfaz de usuario increíble para monitorear y solucionar cualquier problema que pueda surgir.

screenshot-from-2020-11-13-19-54-11-1-4769331

Iniciar el flujo de aire

Ya hemos discutido los pasos de instalación en el artículo anterior de esta serie.

Para iniciar el servidor de flujo de aire, abra la terminal y ejecute el siguiente comando. El puerto predeterminado es 8080 y si está usando ese puerto para otra cosa, puede cambiarlo.

airflow webserver -p 8080

Ahora, inicie el programador de flujo de aire usando el siguiente comando en una terminal diferente. Supervisará todos sus flujos de trabajo y los activará según los haya asignado.

airflow scheduler

Ahora, asegúrese de tener un nombre de carpeta dags en el directorio de flujo de aire donde definirá su DAGS y abrirá el navegador web e irá a abrir: http: // localhost: 8080 / admin / y verá algo como esto:

screenshot-from-2020-11-17-12-41-56-1-9588960

Operador de Python en Apache Airflow

Un operador describe una sola tarea del flujo de trabajo y los operadores nos proporcionan, diferentes operadores, para muchas tareas diferentes, por ejemplo BashOperator, PythonOperator, Operador de correo electrónico, MySqlOperator, etc. En el último artículo, aprendimos cómo usar la BashOperator para obtener los puntajes de cricket en vivo y en esto, veremos cómo usar el PythonOperator.

Echemos un vistazo al siguiente ejemplo:

  1. Importar las bibliotecas

    Comencemos por importar las bibliotecas que necesitamos. Usaremos el PythonOperator esta vez.

  2. Definición de argumentos de DAG

    Para cada uno de los DAG, necesitamos pasar un diccionario de argumentos. Aquí está la descripción de algunos de los argumentos que puede pasar:

    • dueño: El nombre del propietario del flujo de trabajo debe ser alfanumérico y puede tener guiones bajos, pero no debe contener espacios.
    • depende_en_pasado: Si cada vez que ejecuta su flujo de trabajo, los datos dependen de la ejecución anterior, márquelo como Verdadero; de lo contrario, márquelo como Falso.
    • fecha de inicio: Fecha de inicio de su flujo de trabajo
    • Email: Su ID de correo electrónico, para que pueda recibir un correo electrónico siempre que alguna tarea falle por cualquier motivo.
    • retry_delay: Si alguna tarea falla, entonces cuánto tiempo debe esperar para reintentarla.

  3. Definición de la función de Python

    Ahora, definiremos la función de Python que imprimirá una cadena usando un argumento y esta función luego será usada por PythonOperator.

  4. Definición de DAG

    Ahora, crearemos un objeto DAG y pasaremos el dag_id que es el nombre del DAG y asegúrese de no haber creado ningún DAG con este nombre antes. Pase los argumentos que definimos anteriormente y agregue una descripción y horario_intervalo que ejecutará el DAG después del intervalo de tiempo especificado

  5. Definición de la tarea

    Solo tenemos una tarea para nuestro flujo de trabajo:

    1. impresión: En la tarea, imprimiremos “Apache Airflow es una herramienta imprescindible para los ingenieros de datos” en la terminal usando la función python.

    Pasaremos el task_id al PythonOperator objeto. Verá este nombre en los nodos de la Vista de gráfico de su DAG. Pase el nombre de la función de Python al argumento «Python_callable» que desea ejecutar y los argumentos que utiliza para el parámetro «Op_kwargs» como diccionario y, finalmente, el objeto DAG al que desea vincular esta tarea.

  6. Ejecute el DAG

    Ahora, cuando actualice su panel de Airflow, verá su nuevo DAG en la lista.

    Haga clic en el DAG y abra la vista de gráfico y verá algo como esto. Cada uno de los pasos del flujo de trabajo estará en un cuadro separado. En este flujo de trabajo, solo tenemos un paso que es imprimir. Ejecute el flujo de trabajo y espere hasta que su borde se vuelva verde oscuro, lo que indica que se completó correctamente.

    screenshot-from-2020-11-23-11-47-09-6630914

    Haga clic en el nodo «imprimir» para obtener más detalles sobre este paso y luego haga clic en Registros y verá el resultado como este.

    screenshot-from-2020-11-23-11-47-45-6122900

¿Cuáles son las variables en Apache Airflow?

Sabemos que Airflow se puede utilizar para crear y gestionar flujos de trabajo complejos. Podemos ejecutar varios flujos de trabajo al mismo tiempo. Existe la posibilidad de que la mayoría de sus flujos de trabajo estén usando la misma base de datos o la misma ruta de archivo. Ahora, si realiza algún cambio como cambiar la ruta del directorio donde usar los archivos de guardado o cambiar la configuración de las bases de datos. En ese caso, no desea actualizar cada uno de los DAGS por separado.

Airflow proporciona una solución para esto, puede crear variables donde puede almacenar y recuperar datos en tiempo de ejecución en los múltiples DAGS. Entonces, si ocurre algún cambio importante, puede editar su variable y sus flujos de trabajo están listos para comenzar.

¿Cómo crear variables?

Abra el panel de Airflow y haga clic en el Administración en el menú superior y luego haga clic en Variables.

screenshot-from-2020-11-23-17-04-28-9723624

Ahora, haga clic en Crear para crear una nueva variable y se abrirá una ventana como esta. Agregue la clave y el valor y envíelo. Aquí, estoy creando una variable con el nombre de clave como Ruta de datos y valor como la ruta de cualquier archivo de texto aleatorio.

screenshot-from-2020-11-23-17-42-27-8035085

Ahora, crearemos un DAG donde encontraremos el recuento de palabras de los datos de texto presentes en este archivo. Cuando desee utilizar las variables, debe importarlas. Veamos cómo hacer esto:

Luego, definiremos la función que utilizará la ruta de la variable, la leerá y calculará el recuento de palabras.

El resto de los pasos son los mismos que hicimos anteriormente, debe definir el DAG y las tareas y su flujo de trabajo está listo para ejecutarse.

Puedes ver los resultados en el log y ahora si puedes usar esta variable en cualquier otro DAG y también puedes editarla cuando quieras y se actualizan todos tus DAGS.

screenshot-from-2020-11-23-17-28-42-1151304

Notas finales

En este artículo, entendimos cómo usar el operador de Python en Apache Airflow, conceptos como ramificación y variables, y cómo crearlos. En el próximo artículo, crearemos un proyecto de aprendizaje automático y automatizaremos su flujo de trabajo utilizando Apache Airflow.

Le recomiendo que consulte los siguientes recursos de ingeniería de datos para mejorar su conocimiento:

Si tiene alguna pregunta relacionada con este artículo, hágamelo saber en la sección de comentarios a continuación.

Suscribite a nuestro Newsletter

No te enviaremos correo SPAM. Lo odiamos tanto como tú.