Visión general
- Entendemos Python Operator en Apache Airflow con un ejemplo
- También discutiremos el concepto de Variables en Apache Airflow
Introducción
Apache Airflow es una herramienta imprescindible para los ingenieros de datos. Facilita la creación y el seguimiento de todos sus flujos de trabajo. Cuando tiene varios flujos de trabajo, hay más posibilidades de que esté utilizando las mismas bases de datos y las mismas rutas de archivo para varios flujos de trabajo. El uso de variables es una de las formas más eficientes de definir dicha información compartida entre diferentes flujos de trabajo.
Cubriremos el concepto de variables en este artículo y un ejemplo de un operador de Python en Apache Airflow.
Este artículo es una continuación de Ingeniería de datos 101: Introducción a Apache Airflow, donde cubrimos las características y componentes de las bases de datos de flujo de aire, los pasos de instalación y creamos un DAG básico. Entonces, si es un principiante completo en Apache Airflow, le recomendaría que lea ese artículo primero.
Tabla de contenido
- ¿Qué es Apache Airflow?
- Iniciar el flujo de aire
- Operador de Python en Apache Airflow
- ¿Cuáles son las variables en Apache Airflow?
¿Qué es Apache Airflow?
Flujo de aire Apache es un motor de flujo de trabajo que programará y ejecutará fácilmente sus complejas canalizaciones de datos. Se asegurará de que cada tarea de su canalización de datos se ejecute en el orden correcto y que cada tarea obtenga los recursos necesarios.
Le proporcionará una interfaz de usuario increíble para monitorear y solucionar cualquier problema que pueda surgir.
Iniciar el flujo de aire
Ya hemos discutido los pasos de instalación en el artículo anterior de esta serie.
Para iniciar el servidor de flujo de aire, abra la terminal y ejecute el siguiente comando. El puerto predeterminado es 8080 y si está usando ese puerto para otra cosa, puede cambiarlo.
airflow webserver -p 8080
Ahora, inicie el programador de flujo de aire usando el siguiente comando en una terminal diferente. Supervisará todos sus flujos de trabajo y los activará según los haya asignado.
airflow scheduler
Ahora, asegúrese de tener un nombre de carpeta dags en el directorio de flujo de aire donde definirá su DAGS y abrirá el navegador web e irá a abrir: http: // localhost: 8080 / admin / y verá algo como esto:
Operador de Python en Apache Airflow
Un operador describe una sola tarea del flujo de trabajo y los operadores nos proporcionan, diferentes operadores, para muchas tareas diferentes, por ejemplo BashOperator, PythonOperator, Operador de correo electrónico, MySqlOperator, etc. En el último artículo, aprendimos cómo usar la BashOperator para obtener los puntajes de cricket en vivo y en esto, veremos cómo usar el PythonOperator.
Echemos un vistazo al siguiente ejemplo:
Importar las bibliotecas
Comencemos por importar las bibliotecas que necesitamos. Usaremos el PythonOperator esta vez.
Definición de argumentos de DAG
Para cada uno de los DAG, necesitamos pasar un diccionario de argumentos. Aquí está la descripción de algunos de los argumentos que puede pasar:
- dueño: El nombre del propietario del flujo de trabajo debe ser alfanumérico y puede tener guiones bajos, pero no debe contener espacios.
- depende_en_pasado: Si cada vez que ejecuta su flujo de trabajo, los datos dependen de la ejecución anterior, márquelo como Verdadero; de lo contrario, márquelo como Falso.
- fecha de inicio: Fecha de inicio de su flujo de trabajo
- Email: Su ID de correo electrónico, para que pueda recibir un correo electrónico siempre que alguna tarea falle por cualquier motivo.
- retry_delay: Si alguna tarea falla, entonces cuánto tiempo debe esperar para reintentarla.
Definición de la función de Python
Ahora, definiremos la función de Python que imprimirá una cadena usando un argumento y esta función luego será usada por PythonOperator.
Definición de DAG
Ahora, crearemos un objeto DAG y pasaremos el dag_id que es el nombre del DAG y asegúrese de no haber creado ningún DAG con este nombre antes. Pase los argumentos que definimos anteriormente y agregue una descripción y horario_intervalo que ejecutará el DAG después del intervalo de tiempo especificado
Definición de la tarea
Solo tenemos una tarea para nuestro flujo de trabajo:
- impresión: En la tarea, imprimiremos “Apache Airflow es una herramienta imprescindible para los ingenieros de datos” en la terminal usando la función python.
Pasaremos el task_id al PythonOperator objeto. Verá este nombre en los nodos de la Vista de gráfico de su DAG. Pase el nombre de la función de Python al argumento «Python_callable» que desea ejecutar y los argumentos que utiliza para el parámetro «Op_kwargs» como diccionario y, finalmente, el objeto DAG al que desea vincular esta tarea.
Ejecute el DAG
Ahora, cuando actualice su panelUn panel es un grupo de expertos que se reúne para discutir y analizar un tema específico. Estos foros son comunes en conferencias, seminarios y debates públicos, donde los participantes comparten sus conocimientos y perspectivas. Los paneles pueden abordar diversas áreas, desde la ciencia hasta la política, y su objetivo es fomentar el intercambio de ideas y la reflexión crítica entre los asistentes.... de Airflow, verá su nuevo DAG en la lista.
Haga clic en el DAG y abra la vista de gráfico y verá algo como esto. Cada uno de los pasos del flujo de trabajo estará en un cuadro separado. En este flujo de trabajo, solo tenemos un paso que es imprimir. Ejecute el flujo de trabajo y espere hasta que su borde se vuelva verde oscuro, lo que indica que se completó correctamente.
Haga clic en el nodoNodo es una plataforma digital que facilita la conexión entre profesionales y empresas en busca de talento. A través de un sistema intuitivo, permite a los usuarios crear perfiles, compartir experiencias y acceder a oportunidades laborales. Su enfoque en la colaboración y el networking hace de Nodo una herramienta valiosa para quienes desean expandir su red profesional y encontrar proyectos que se alineen con sus habilidades y objetivos.... «imprimir» para obtener más detalles sobre este paso y luego haga clic en Registros y verá el resultado como este.
¿Cuáles son las variables en Apache Airflow?
Sabemos que Airflow se puede utilizar para crear y gestionar flujos de trabajo complejos. Podemos ejecutar varios flujos de trabajo al mismo tiempo. Existe la posibilidad de que la mayoría de sus flujos de trabajo estén usando la misma base de datosUna base de datos es un conjunto organizado de información que permite almacenar, gestionar y recuperar datos de manera eficiente. Utilizadas en diversas aplicaciones, desde sistemas empresariales hasta plataformas en línea, las bases de datos pueden ser relacionales o no relacionales. Su diseño adecuado es fundamental para optimizar el rendimiento y garantizar la integridad de la información, facilitando así la toma de decisiones informadas en diferentes contextos.... o la misma ruta de archivo. Ahora, si realiza algún cambio como cambiar la ruta del directorio donde usar los archivos de guardado o cambiar la configuración de las bases de datos. En ese caso, no desea actualizar cada uno de los DAGS por separado.
Airflow proporciona una solución para esto, puede crear variables donde puede almacenar y recuperar datos en tiempo de ejecución en los múltiples DAGS. Entonces, si ocurre algún cambio importante, puede editar su variableEn estadística y matemáticas, una "variable" es un símbolo que representa un valor que puede cambiar o variar. Existen diferentes tipos de variables, como las cualitativas, que describen características no numéricas, y las cuantitativas, que representan cantidades numéricas. Las variables son fundamentales en experimentos y estudios, ya que permiten analizar relaciones y patrones entre diferentes elementos, facilitando la comprensión de fenómenos complejos.... y sus flujos de trabajo están listos para comenzar.
¿Cómo crear variables?
Abra el panel de Airflow y haga clic en el Administración en el menú superior y luego haga clic en Variables.
Ahora, haga clic en Crear para crear una nueva variable y se abrirá una ventana como esta. Agregue la clave y el valor y envíelo. Aquí, estoy creando una variable con el nombre de clave como Ruta de datos y valor como la ruta de cualquier archivo de texto aleatorio.
Ahora, crearemos un DAG donde encontraremos el recuento de palabras de los datos de texto presentes en este archivo. Cuando desee utilizar las variables, debe importarlas. Veamos cómo hacer esto:
Luego, definiremos la función que utilizará la ruta de la variable, la leerá y calculará el recuento de palabras.
El resto de los pasos son los mismos que hicimos anteriormente, debe definir el DAG y las tareas y su flujo de trabajo está listo para ejecutarse.
Puedes ver los resultados en el log y ahora si puedes usar esta variable en cualquier otro DAG y también puedes editarla cuando quieras y se actualizan todos tus DAGS.
Notas finales
En este artículo, entendimos cómo usar el operador de Python en Apache Airflow, conceptos como ramificación y variables, y cómo crearlos. En el próximo artículo, crearemos un proyecto de aprendizaje automático y automatizaremos su flujo de trabajo utilizando Apache Airflow.
Le recomiendo que consulte los siguientes recursos de ingeniería de datos para mejorar su conocimiento:
Si tiene alguna pregunta relacionada con este artículo, hágamelo saber en la sección de comentarios a continuación.