En el artículo previo llamado Tratamiento de datos con AWS Glue Databrew, ya hemos hablado de cómo implementar desde cero un proceso ETL simple con el objetivo de transformar los datos con Recipes predefinidas de AWS Glue Databrew sin tener que dedicar mucho esfuerzo para ello, pero a pesar de que se disponga de una gran variedad de opciones de análisis y transformación listas para usar, ¿Qué pasaría si lo que necesito es realizar acciones que no están en esta lista, ni siquiera combinando algunas de ellas?, para estos casos, AWS Glue, que también es una herramienta serverless, nos ofrece la opción de incorporar al proceso de transformación, pasos adicionales que incluyan implementaciones personalizadas.
Aquí veremos cómo reusar las transformaciones realizadas previamente con Databrew en AWS Glue, e incluir adicionalmente acciones propias usando scripts de Python, valiéndose de las utilidades potentes que tienen las librerías de gestión de matrices como Numpy y Pandas, de visualización de los datos como Matplotlib, y de transformación avanzada de datos como scikit-learn o PyTorch.
AWS Glue ETL Jobs
Esta herramienta nos permite de modo visual, con Visual ETL, configurar un flujo de procesamiento de datos desde un origen interno o externo de la nube de AWS, hasta almacenar el resultado también de forma interna o externa.
Para crear un flujo que implemente un proceso ETL, hay que crear en Job desde Visual ETL que orquestará los distintos elementos del flujo, gestionará las operaciones de los nodos y sus transiciones, la comunicación con elementos externos y los datos tratados durante el proceso. Los Jobs corren sobre una plataforma de procesamiento paralelo de datos que está basado por defecto en Spark y que pueden ser manejados a través la librería PySpark, pero solo al momento de crear el Job se puede cambiar a la opción Ray.
A continuación, vamos a trabajar sobre un flujo sencillo de transformación con recetas de Databrew y con transformación personalizadas, pero el flujo podría hacerse más complejo como se quiera a partir de los múltiples tipos de nodos que ofrece Glue ETL que están agrupados en Sources, Transmorfs y Targets.
- Sources: Son nodos que extraen los datos que serán procesados desde las fuentes de origen. Entre las funciones se incluyen:
- La lectura de ficheros de datos estructurados como CSV, Apache Hudi, JSON, Parquet, entre otros.
- La lectura desde base de datos a través de distintas interfaces de acceso como Data connections, Data Catalog tables, DynamoDB, Google BigQuery, Snowflake, etc.
- Obtener los datos desde un flujo de streaming con Kinesis o Apache Kafka.
- Diferentes tipos de lecturas pueden hacerse paralelamente.
- Transforms: Son nodos que aportan la capacidad de cambiar los datos leídos por los nodos de tipo Sources. La variedad de transformaciones soportadas es tan amplia y flexible que podría llevar un artículo completo explicarlas todas, pero para nuestro caso queremos destacar los nodos Recipes y Custom Transforms:
- Recipes: Este tipo de transformación permite seleccionar la lista de Recipes creados previamente en Databrew para aprovechar todo el poder de tratamiento de datos que ofrece esa herramienta dentro de este flujo.
- Custom Tranforms: Cuando de todas las alternativas de transformación que nos ofrece tanto Databrew y Glue ETL no podemos encontrar una acción particular, este nodo nos facilita implementar scripts de código para realizar tratamientos de datos personalizados. Entre los lenguajes soportados para habilitar esta operación están Python y Scala.
- Target: Son nodos que facilitan el almacenamiento o envío de los datos que han sido transformados por los nodos previos. Entre las alternativas ofrecidas tenemos:
- Almacenar los resultados en ficheros en S3 con una buena variedad de formatos.
- Almacenar los resultados en base de datos tanto gestionadas por AWS como externas, relacionales y no relacionales, usando las interfaces de conexión que ofrece la herramienta.
Con esta tipología de nodos, ya podemos crear nuestro primer flujo ETL que responda a la necesidad inicial que hemos mencionado, reaprovechar la configuración de Recipes hecha con Databrew y aplicar cambios personalizados a los datos.
A continuación, mostramos el esquema del flujo una vez creado cada tipo de nodo y conectándolos entre ellos.
Extracción de los datos
Para este caso usaremos el nodo Amazon S3 que conecta con los buckets donde se encuentran los datos. Este nodo puede leer de forma recursiva un grupo de ficheros que se encuentran en una ruta especifica de un bucket, generando por cada uno de ellos un DynamicFrame que después será usados en los siguientes nodos. Además, se puede usar un Data Catalog table para leer un dataset que ya ha sido rastreado por AWS Glue Crawler.
También se especifica el formato de lectura de los datos en caso de lectura directa a una ubicación de S3, el separador de columna, si los ficheros tienen cabecera, y otras propiedades para indicarle al nodo como se deben leer los ficheros. Este tipo de nodos no tienen otro nodo que lo preceda.
Aunque no se nuestro caso, esta fase del proceso puede requerir más de un nodo de tipo Source para leer datos de distintos orígenes soportados.
Transformación de los datos
Cuando ya tenemos un nodo o grupos de nodos que extraen la información de las fuentes de origen, ya podemos empezar a tratarlos, y como ya hemos comentado antes, podemos abrir un abanico de posibilidades para analizar, unir, separar, modificar, añadir, eliminar o cualquier otro tipo acción sobre los datasets originales.
Existen técnicas de transformación que intentan optimizar este proceso de transformación, y entre las primeras que se recomiendan aplicar es Cleansing data, que permite reducir el volumen de datos para eliminar los datos no efectivos y preparar el restante para las siguientes fases.
Otra técnica importante que se aplican sobre todo a los datos sensibles ya sea de índole personal o comercial es Data Masking, que enmascara la información para evitar que se pueda identificar a qué ente pertenece dicho dato, pero sin perder las propiedades que aportan los datos originales respecto a la relación con el resto de los datos.
AWS Glue Databrew Recipes
Este elemento ofrece muchas acciones que implementan las 2 técnicas mencionada previamente, además de otras técnicas más, por tanto, es superútil crear Recipes y usarlas en los flujos ETL de Glue. En el artículo Tratamiento de datos con AWS Glue Databrew, que es predecesor a éste, ya explicamos como configurar y crear training-malaga-historic-weather-recipe que contiene las transformaciones sobre el mismo dataset de origen que estamos tratando, por tanto, es suficiente en este nodo solo seleccionar la versión deseada de este recipe.
Además, se debe seleccionar el nodo de tipo Source al cual aplicar las transformaciones del recipe.
Custom Transforms
Este tipo de nodo incrementa en gran medida la flexibilidad que ofrece Glue para los procesos de ETL, ya que habilita al beneficiario del tratamiento de datos, entre ellos el ingeniero de datos o científico de datos, a realizar transformaciones muy específicas y concretas sobre los datos difíciles de encontrar en las acciones predeterminadas tanto de Databrew o de Glue ETL Job.
La posibilidad de usar scripts y librerías con el objetivo de extender el tratamiento de los datos es muy poderosa, y como veremos a continuación, se usará la librería suntime para obtener la hora del amanecer y anochecer del día concreto, y a partir de ello, calcular si la hora pertenece al día. Además, se calculará también la estación que corresponde a la fecha.
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
import numpy as np
import pandas as pd
import datetime
from suntime import Sun
df = dfc.select(list(dfc.keys())[0]).toDF()
df = df.drop("sea_level", "grnd_level")
dataset = df.toPandas()
# Malaga latitude and longitude
lat = 36.706111
long = -4.472500
sun = Sun(lat, long)
# Calculate the sunrise and sunset time.
dataset['sunrise'] = dataset.query('dt_iso == dt_iso')['dt_iso'].dt.date.apply(sun.get_sunrise_time)
dataset['sunset'] = dataset.query('dt_iso == dt_iso')['dt_iso'].dt.date.apply(sun.get_sunset_time)
#Remove the tzutc property.
dataset['sunrise'] = pd.to_datetime(dataset['sunrise']).dt.tz_localize(None)
dataset['sunset'] = pd.to_datetime(dataset['sunset']).dt.tz_localize(None)
# Calculate if is daytime.
dataset['isDaytime'] = np.logical_and(dataset['dt_iso'] > dataset['sunrise'], dataset['dt_iso'] < dataset['sunset'])
# Calculate the season of the year.
dataset['season'] = pd.cut((dataset.dt_iso.dt.month * 100 + dataset.dt_iso.dt.day - 320) % 1300, [-1, 300, 602, 900, 1300], labels=['spring', 'summer', 'autumn', 'winter'])
df = spark.createDataFrame(dataset)
dyf_filtered = DynamicFrame.fromDF(df, glueContext, "changed")
return DynamicFrameCollection({"CustomTransform0": dyf_filtered}, glueContext)
El script debe estar enmarcado en el método MyTransform que recibe el contexto del Job de Glue, y una lista de DynamicFrame que corresponde con todos los dataset que se leyeron en los nodos de tipo Source, que en nuestro caso la lista tendrá el único dataset que hemos leído. Una vez realizado todas las operaciones necesarias sobre el dataset, se debe devolver la lista de DynamicFrame que será recibido en los siguientes nodos del flujo.
Glue ETL Jobs carga por defecto un grupo de librerías esenciales para el tratamiento de datos, como numpy, pandas, scikit-learn, etc, pero entre ellas no incluye la librería suntime, por lo que se requiere especificar al Job las librerías que debe instalar cuando se corra el flujo.
Seleccionar el dataset destino
Previo al paso de almacenamiento de los datos resultados, y considerando que el almacenamiento escogido es un fichero con formato Parquet en S3, en necesario reducir la dimensión de la lista de DynamicFrame a uno solo dataset, ya que en este formato no es posible almacenar en un mismo fichero más de un dataset. Con el nodo SelectFromCollection se puede escoger de la lista de DynamicFrame el deseamos guardar.
Este tipo de nodo también permite bifurcar el flujo con el fin de tratar de diferentes maneras en ramas paralelas cada uno de los dataset que exista en la lista de DynamicFrame.
Almacenado de los resultados
Finalmente, el proceso ETL que hemos diseñado e implementado llega hasta este nodo, donde obtiene el datase seleccionado en el nodo anterior. Este nodo puede escribir cada dataset en varios formatos conocidos como JSON, CSV, Avro, ORC, Parquet, Apache Hudi, Delta Lake y Apache Iceberg. También permite aplicar algún tipo de compresión de los datos para reducir su volumen en el bucket. Además de escribir en S3, se puede especificar que se escriba también en una tabla de Data Catalog.
Para nuestro caso se crean 36 ficheros destino en formato Parquet que incluyen todo un mismo identificador en el nombre del fichero más la partición al que corresponde, por ejemplo, run-1708385726207-part-block-0-r-00035-snappy.parquet, y se debe a que el Job divide el fichero original en particiones que procesa de forma paralela con Spark con el objetivo de reducir el tiempo total de ejecución del Job.
Conclusiones
Conocer una pequeña parte de las bondades que dispone Glue ya nos permite explotar nuestros datos con mayor rapidez y facilidad, y solo requiriendo conocimientos básicos de los conceptos de ETL. Glue ETL Job complementa o extiende las capacidades de Glue Databrew, y en ningún caso se pueden verse como sustituto el uno del otro, ya que Databrew son ofrece explorar y disponer de más información analítica cuando queremos realizar un EDA introductorio para comprender en forma general cómo se comportan los datos, y Glue ETL Job son ofrece una plataforma potente para el tratamiento global de los datos de una organización.
Desde mi punto de vista, ambas herramientas han sido diseñadas con propósitos distintos, ya que Databrew está más orientado a perfiles de científicos de datos, y Glue ETL Job está orientado a Ingenieros de datos, pero la ventaja más relevante es la posibilidad de integrar las operaciones de transformación de uno en el otro que permite ser un punto de encuentro entre ambos perfiles.