Incorporando el linaje de datos a Airflow 

Sigo con artículos sobre Linaje de Datos, si bien en anteriores artículos os contaba qué es el linaje de datos y por qué debemos incorporarlo a los proyectos de Data, y también os hablaba de openLineage y cómo desplegar Marquez. Hoy voy a explicaros cómo integrar Marquez en Apache Airflow para poder incorporar este control del linaje de datos a los procesos de ETL/ELT orquestados desde Apache Airflow. 

Podéis repasar los artículos de los que os hablo aquí

Los pasos que os explico están realizados sobre una versión 2.10 de Airflow desplegada sobre Docker Compose.  

¿ Cómo se hace la magia ? 

Para poder integrar Marquez en Airflow, necesitamos que en Airflow se instale una librería que permita extender su funcionalidad para trabajar con el framework de OpenLineage. Se trata de la  ibrería apache-airflow-providers-openlineage. Esta librería extiende las funcionalidades de Airflow, implementando la captura de información de linaje para los operadores; se puede consultar en esta página los operadores que se integran con las funcionalidades de OpenLineage. 

La información sobre linaje capturada por las funcinalidades de la librería que expanden a Airflow, se enviarán a Marquez. 

¡ Ahora toca configurar cosas ! 

Toca preparar una imagen custom de Apache Airflow para incorporar la librería de la que estamos hablando. Os he dejado en el repositorio asociado a esta entrada el archivo Dockerfile. También teneis los pasos a dar en este README.md

A partir de ese archivo, se generará la imagen custom lanzando: 

Tras haber construido la imagen custom, pararemos Airflow y modificaremos la referencia a la imagen que debe utilizar, pasando a utilizar la imagen custom creada. Lo que tendremos que hacer es cambiar esta línea: 

Por esta otra: 

Antes de guardar los cambios y cerrar el archivo, introducimos los valores de configuración de la comunicación con Marquez. Para ello, dentro de la definición de variables de entorno, añadimos lo siguiente: 

En dónde: 

  • AIRFLOW__OPENLINEAGE__TRANSPORT: se marca la configuración de la conexión en sí. 
  • AIRFLOW__OPENLINEAGE__NAMESPACE: se indica el namespace dentro de Marquez sobre el que escribirá los eventos. 
  • AIRFLOW__OPENLINEAGE__DISABLED: marcamos como habilitada la integración, esto lo incluyo por si en algún momento se quiere lanzar Airflow sin el registro de eventos de linaje, deshabilitarlo rápido. 
  • AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE: en algunos operadores como bashOperator y PythonOperator, dentro de la información del evento se incluye el código que ejecuta. Esta característica que viene aplciada por defecto, se puede inahbilitar con este parámetro, de esta forma evitamos que el código se lleve a los eventos de Marquez. 

Ahora arrancamos de nuevo Apache Airflow y ya tendríamos la configuración realizada. Dentro de la página de OpenLienage, podemos descargarnos un archivo en donde se define un DAG que permitirá evaluar si la integración está configurada de manera correcta. Podéis obtener el código en esta página, o bien, utilizar el archivo que he dejado en el repositorio.  

Tendremos este resultado si todo ha ido bien y esta configurada la librería de forma correcta. 

¡ Toca probar todo esto ! 

Pues nada, ahora toca la parte divertida, empezar a probar todo esto y ver qué se pinta en Marquez. Para ello he hecho un DAG que lanza una consulta contra una tabla, precisamente contra la base de datos de Marquez, para obtener la información de los jobs lanzados, y el resultado se guardará en un archivo CSV en un filesystem local. Se simulará que una tercera tarea procesa ese archivo, solo para poder generar un ejempolo de dependencias de entrada y salida, para ver cómo se mostrarán estos flujos dentro de Marquez. 

Dentro del DAG se definen una serie de variables que se utilizarán a lo largo de los procesos: 

Jugaremos con entities de Airflow para crear un objeto de tipo File, es importante que este archivo tenga una referencia de protocolo, en este caso file://, ya que será lo que capture OpenLienage para representarlo en el flujo de Linaje de Datos. Ahí tenemos pues, el path con el nombre del archivo a generar, la URI del archivo y el objeto de tipo File que es el que usaremos de referencia en las tareas. Para otros objetos como son conexiones definidas dentro del apartado de connections de Airflow no hará falta, se capturarán de manera directa, como pasa con la conexión a la base de datos que se consultará en la primera tarea. 

Como véis el código para esta parte en donde se obtiene información de la tabla de base de datos, no tiene nada fuera de lo común. 

Para la parte del archivo, sí que tendremos que jugar con los inlets outlets de Airflow para marcar esa dependencia con el fichero, por defecto no es capaz de interceptar esta dependencia, con lo que tendremos que poner de nuestra parte en este tipo de procesos para poder remarcar esta dependencia dentro del control del linaje de datos: 

Una vez lanzado el DAG, en Marquez tendremos lo siguiente: 

Dentro del namespace que hemos definido en la configuración, se verán cuatro entradas: una para el DAG en sí, y otra para cada una de las tareas que lo conforman: 

Pulsando en cada uno de ellos, podemos ver toda la información de cada uno. Por ejemplo, para la información del DAG, podemos ver: 

Si ahora vamos al job de la tarea encargada de lanzar la consulta, podemos ver que tiene referido un Dataset asociado a la conexión a base de datos, con la información de la tabla consultada. La consulta realizada se puede ver dentro de la información del job, en lugar del Dataset, solo hace falta pulsar en el job y ver la información asociada al mismo: 

Fijaos en un detalle, para el Dataset no toma el namespace configurado, sino que crea uno específico para la conexión a esa base de datos, y forma el nombre del dataset con <nombre_base_datos>.<nombre_esquema>.<nombre_tabla>:  

En cuanto a las tareas que trabajan con el fichero, entrando en cualqueira de sus jobs, se puede ver la dependencia con el archivo, que no llega a generar un dataset en Marquez como tal, pero sí permite ver el nombre del archivo y URI:  

Como veis, con muy poco hemos conseguido registrar en Marquez todos los procesos de linaje de datos que generan los procesos ETL/ELT de Airflow. En las referencias os dejo información acerca de más parámetros de configuración de esta integración, junto con una guía para crear extractores custom de Openlineage para ampliar capacidades de operadores custom y que también registren estos eventos de linaje.  

Referencias