Prueba-css

Walter Talaverano

Microsoft Certified | Certified

Contenido en español

LakeHouse Streaming en AWS con Apache Flink y Hudi (Parte 2)

Introducción

Este artículo es el segundo en una serie de publicaciones que se centran en la creación de un LakeHouse con Hudi a partir de una ingesta en streaming procesada por una aplicación Flink. El primer artículo se centra en sentar una buena base para esta plataforma, donde se desplegaron unas aplicaciones Flink con KDA (Kinesis Data Analytics) para cada tipo de formato (MoR, CoW para Hudi y JSON) que escriben el resultado de este procesamiento en unos buckets.

El envío de datos que se utiliza como input se mandaba en el anterior artículo desde una máquina en local ejecutando una aplicación de Locust, lo que puede presentar problemas a la hora de escalar y querer procesar un volumen alto de eventos. Además, las aplicaciones de Kinesis Data Analytics con Flink presentan problemas de agilidad en su modo de autoescalado. Todos estos nuevos retos serán resueltos en este artículo.

También se catalogarán estas tablas en Glue, servicio que disponibiliza un catálogo de datos en AWS, para poder acceder a estos y así realizar queries de todo tipo. Como motor de queries que consumirá estos metadatos se utilizará Athena, que proporciona una experiencia escalable, ágil y serverless para poder ejecutar queries con SQL o Spark para nuestras tablas alojadas en S3.

Por otro lado, en este artículo también se han desplegado los componentes necesarios para poder monitorizar nuestras aplicaciones y extraer así conclusiones sobre la velocidad a la que se ingestan los datos y los posibles problemas a resolver para que el procesamiento tenga la latencia requerida según los requisitos que se impongan.

Finalmente se realizará una comparativa en cuanto a rendimiento y latencia de las diferentes aplicaciones de Flink que escriben datos en los formatos de Hudi y JSON para así poder ver las diferentes ventajas e inconvenientes de estos formatos.

Tabla de contenidos

Introducción

Arquitectura

Escalado

Hudi

Configuración de Hudi

Stress Tests & Insights

Desafíos en el desarrollo

Conclusiones

Referencias

Autores

Arquitectura

A continuación se puede ver la arquitectura a alto nivel que se desplegará:

Para un mayor entendimiento vamos a explicarla de izquierda a derecha. Como se puede observar, el cambio más reseñable con respecto al primer artículo es la inclusión de un cluster de Kubernetes para poder escalar los eventos que serán mandados como input de nuestra aplicación de streaming. De esta manera se podrá testear de manera exhaustiva el rendimiento de las aplicaciones de Flink dependiendo de su aprovisionamiento y sobre todo del tipo de formato y tabla en el que escriben al LakeHouse. Además, se ha disponibilizado un ALB (Application Load Balancer) que permite acceder a la interfaz de Locust para poder definir el número de usuarios a simular y cómo deben escalar estos con el tiempo. La URL para acceder a esta aparecerá como output al desplegar la infraestructura con Terraform.

Por otro lado se han realizado cambios reseñables en las aplicaciones KDA de Flink y el stream del que leen estas. Cada aplicación lee ahora como consumidores EFO (Enhanced Fan Out), de tal manera que cada una de ellas tiene un ancho de banda dedicado. La razón de este cambio y sus detalles serán explicados más en detalle en el apartado dedicado para Kinesis.

En cuanto a la monitorización y la extracción de métricas en NRT (Near Real Time) se han desplegado unas funciones lambdas que acceden a las tablas apoyándose en Athena gracias a haber registrado los metadatos de estas en el catálogo de Glue. Es importante resaltar que los metadatos de las tablas de Hudi son registrados en Glue por Flink pero en el caso de JSON se despliega un crawler que registra estas tablas en el catálogo. Este crawler se debe ejecutar manualmente para que esta tabla quede registrada en Glue.

Escalado

Kinesis Stream

Dado que el objetivo es someter la aplicación a una carga considerable de eventos por segundo, es necesario explicar cómo cada una de las piezas de la arquitectura pueden escalar de acuerdo al volumen de datos.

Como hemos comentado previamente, se ha optado por un Kinesis Stream On-Demand para automatizar el escalado de los shards durante las pruebas de carga. Es necesario tener en cuenta que estos streams pueden acomodar una tasa de escritura de hasta el 200% de lo especificado por el número de shards en un momento dado.

Una vez que el stream se encuentra por encima del 100%, aumentará automáticamente el número de shards en un plazo de 15 minutos. La única limitación por tanto es no superar el doble del volumen de escritura admitido en menos de dicho periodo.

Por otro lado, dado que se tendrán tres aplicaciones de Flink leyendo del mismo stream, las limitaciones a nivel de lectura serán el mayor problema. Un Kinesis Stream solo admite 5 llamadas GetRecord por shard por segundo. Dado que cada aplicación tiene que leer todo el stream (y por lo tanto, todos los shards), aumentar el número de shards no ayuda a solventar este problema.

La solución pasa por registrar cada una de las aplicaciones como un consumidor Enhanced Fan-Out. Esta funcionalidad de los Kinesis Stream provee a cada uno de estos consumidores con un límite individual de 5 llamadas GetRecord y 2MB por shard por segundo de lectura.

Esta configuración se realiza en el lado del consumidor, en nuestro caso mediante el conector de Kinesis para Flink.

'scan.stream.recordpublisher' = 'EFO',

'scan.stream.efo.registration' = 'EAGER/LAZY',

'scan.stream.efo.consumername' = '{consumer_name}'

Conviene mencionar que alternativamente, es posible aumentar la latencia de lectura de nuestras aplicaciones de Flink. Por defecto Flink realiza una lectura cada 200 ms por shard, de modo que una aplicación consume completamente la cuota de lectura de un stream. Incrementando este valor a 600ms podríamos acomodar las tres aplicaciones, a costa de una mayor latencia.

scan.shard.getrecords.intervalmillis = '600'

También se hará uso de la opción Adaptive Reads, que modifica dinámicamente el número de eventos recogidos por llamada en función del tamaño de cada record. Esto permite aprovechar los 2 MB/s por shard disponibles para cada consumidor.  

'scan.shard.adaptivereads' = 'true'

En lo que respecta al escalado en KPUs (Kinesis Processing Unit) de Flink, se ha optado por no hacer uso del autoescalado, ya que cada proceso de escalado incurren en downtime para la aplicación. Debido a los diferentes requerimientos de cada una de las aplicaciones, las acciones de escalado en momentos inesperados podrían interrumpir las pruebas de carga. Además es interesante medir el rendimiento de escritura de cada una de las aplicaciones en igualdad de capacidad de computación.

Hudi

Timeline

Uno de los sistemas base sobre la que se sustenta el funcionamiento y características de Hudi es la timeline. Hudi guarda un registro temporal de todas las acciones que se han realizado sobre la tabla, así como el estado de esta acción.

Las principales acciones que componen la timeline son

  • Commits - escritura atómica de un conjunto de registros en la tabla en formato columnar
  • Delta Commit - similar al commit, representa una escritura de registros en forma de logs en una tabla Merge on Read
  • Compaction - compactación de las escrituras en logs (delta commits) de una tabla MoR a formato columnar
  • Cleans - borrado de versiones antiguas de archivos
  • Rollback - eliminado de los registros escritos por un commit o delta commit fallido
  • Savepoint - marca un conjunto de archivos como “guardados” para que no sean eliminados por el proceso de limpieza. Permite restaurar la tabla a un punto anterior en la timeline

Cualquiera de estas acciones pueden encontrarse en uno de estos tres estados

  1. Requested - una acción ha sido planeada sin iniciar
  2. Inflight - la acción está en proceso
  3. Completed - denota que la acción ha sido completada

Tipos de tabla

Como se ha dejado entrever en el funcionamiento de la timeline de Hudi, existen dos tipos de escritura soportados: columnar y logs. El formato columnar (parquet) constituye la forma final de una tabla de Hudi, junto con los metadatos de la timeline. Sin embargo, es posible hacer uso de las escrituras en logs (avro) para disminuir la latencia de escritura y eventualmente compactarse a formato columnar sin entorpecer la escritura.

El uso de estos métodos de escritura dan lugar a los dos tipos de tabla que Hudi pone a nuestra disposición

  • Copy on Write - las escrituras se realizan exclusivamente en formato columnar, creando un nuevo fichero con los nuevos registros de la tabla. Los datos están disponibles inmediatamente pero incurre en mayor latencia de escritura

  • Merge on Read - hace uso de la escritura en logs. Los nuevos registros son inicialmente escritos como logs, y posteriormente serán transformados a formato columnar por el proceso de compactación. Obtenemos menor latencia de escritura a costa de latencia de lectura; los nuevos registros no estarán disponibles hasta que se realice la compactación

Tipos de Query

Para poder aprovechar las características de cada tipo de tabla, existen tres tipos de queries que se pueden realizar sobre una tabla de Hudi

  • Snapshot - obtiene la última versión de la tabla. Para las tablas MoR esto implica incurrir en un proceso de compactación para obtener los últimos registros en formato log.

  • Read Optimized - para tablas MoR, lee sólamente los registros ya expuestos en formato columnar sin incurrir en latencia de lectura adicional.

  • Incremental - recoge únicamente los nuevos registros desde un cierto commit o compactación, facilitando la creación de pipelines incrementales. No está soportada por Athena

Integración con Glue Catalog

El conector de Hudi permite una integración nativa con el catálogo de Glue en AWS. Basta con añadir las dependencias de Hive en nuestra aplicación de Flink

com.amazonaws.aws-java-sdk-glue

org.apache.hive.hive-common

org.apache.hive.hive-exec

Y especificar la configuración del catálogo en el conector de Hudi

'hive_sync.enable' = 'true',

'hive_sync.db' = '{glue_database}',

'hive_sync.table' = '{table_name}',

'hive_sync.partition_fields' = '{partition_fields}',

'hive_sync.mode' = 'glue',

'hive_sync.use_jdbc' = 'false'

Con esta integración, la aplicación creará automáticamente las tablas en el catálogo. Como hemos mencionado anteriormente, existen distintos tipos de query para consultar una tabla de Hudi. Se crearán por tanto en el catálogo distintas tablas para soportar las diferentes consultas.

Para una tabla CoW, la tabla se consultará mediante una query Snapshot. Para MoR en cambio se pondrán a disposición dos tablas, para soportar consultas Read Optimized o Snapshot.

La principal aplicación de Glue es de soporte a las lambdas para que al ejecutar las queries mediante Athena su ejecución pueda realizarse de una forma más eficiente, rápida y segura:

  • Glue Catalog: almacenamiento centralizado de la información acerca de la organización, diseño y formato de los datos, utilizado por Athena para realizar directamente las consultas a S3 sin necesidad de tener que apoyarse en terceros para conseguir esta información

  • Automatización del Esquema: Glue rastrea y cataloga automáticamente los datos en S3, detectando y adaptando los cambios en el esquema. Esto evita posibles errores y permite la lectura de los nuevos campos en caso de que se produzcan alteraciones en los esquemas de los eventos

 

Configuración de Hudi

Es importante entender las configuraciones que nos ofrece Hudi para optimizar nuestra aplicación, en particular para una aplicación en Near Real Time conviene estar al tanto de las opciones disponibles. Aunque la capacidad de configuración es inmensa [1], se intentará sintetizar las que pueden ser más relevantes para una primera toma de contacto con esta tecnología.

Particionado

Apache Hudi ofrece los tipos de particionado que pueden encontrarse en otras soluciones, se detallarán las principales y se justificara la implementada:

  • Simple: particionado basado en un único campo, en este caso el campo escogido es ‘ticker’ ya que se ha identificado que es el que tiene una cardinalidad menor.

  • Particionado Compuesto: particionamiento basado en múltiples campos, podría resultar interesante escoger un campo de baja cardinalidad (ticker) y otro de cardinalidad media (fecha)

  • Particionado Dinámico: elección de la variable en base de los valores, puede resultar interesante cuando la cardinalidad de las variables puede sufrir variaciones y se quiera una actualización del particionamiento de una forma automática y flexible.

Índices

Apache Hudi cuenta con una múltiples  tipos de indexación[2], comentaremos brevemente los más comunes:

  • Bloom Index - Hace uso de un bloom filter sobre la key de los eventos, adicionalmente se puede complementar con un filtrado por rango de de key. Funciona bien cuando tratamos con una tabla donde la mayoría de cambios ocurren en las particiones más recientes o para deduplicado de eventos.

  • Simple: indexación realizada mediante la combinación de FileID y RecordKey. Recomendado cuando las operaciones Upsert no son tan frecuentes debido a la simplicidad que este ofrece.

Ambos tipos de índices pueden ser usados en su forma global

  • Índice global - Imponen la unicidad de las keys en todas las particiones de la tabla, es decir, garantizan que existirá sólamente un registro con una cierta key.

  • Índice no global - La unicidad de la key sólo es exigida a nivel de partición. Si los datos son consistentes y una key sólo va a existir en una partición, este tipo de índices ofrecen un rendimiento mucho mayor y mejor escalado.

En este caso, se ha optado por un Bloom Index, el cual es el que se toma por defecto en caso de que no se declare expresamente:

"hoodie.index.type" = "BLOOM"

La elección de este tipo de indexación se debe a que los casos de uso que se han planteado requieren de un procesamiento de datos considerablemente alto y eficiente.

Tipos de operación

Apache Hudi ofrece varios tipos de operaciones[3] que permiten a los usuarios administrar y modificar conjuntos de datos de gran tamaño. A continuación se detallan tanto las principales operaciones realizadas en los Stress Tests como en otros escenarios:

  • Upsert - Es la operación por defecto, y ejecutará un insert o un update dependiendo de si el registro ya existe tras una búsqueda en el índice. Con esta operación la tabla no tendrá duplicados para su clave primaria.

  • Insert - Esta operación ignora la búsqueda en el índice a la hora de insertar eventos. Es la más rápida pero la tabla puede contener duplicados. Aún así es útil si se utilizan métodos auxiliares  de deduplicado, o simplemente la existencia de estos es tolerable en el caso de uso.

  • Delete: Hudi ofrece dos métodos de borrado. Soft Delete convierte a nulos los valores del evento a excepción de la key. Hard Delete ejecuta un borrado físico del evento en la tabla.

  • Bulk Insert Operación similar al Insert pero optimizada para la inserción de un gran volumen de datos, a costa de sacrificar ciertas garantías en el control del tamaño de ficheros. Escala bien para cientos de TBs en caso de bootstrap inicial de una tabla de gran tamaño.

Compactación

En el caso de usar una tabla MoR es posible configurar el ritmo de compactación de logs en parquet para buscar el equilibrio entre latencia de escritura y lectura que más convenga al caso de uso. Se pueden especificar una estrategia de tiempo o número de delta commits (o ambos) que ejecutan un proceso de compactación

compaction.delta_commits

compaction.delta_seconds

compaction.trigger.strategy

Acciones asíncronas

Ciertas acciones de la timeline como la compactación, limpieza, archivado y clustering pueden ser realizadas asíncronamente por la aplicación, o incluso ser relegadas a procesos auxiliares a la aplicación de escritura. Para el caso de Flink, puede ayudar a mejorar la latencia de escritura y evitar problemas de BackPressure en la aplicación.

compaction.async.enabled

hoodie.clean.async

hoodie.archive.async

hoodie.clustering.async.enabled

Stress Tests & Insights

Al desplegar las aplicaciones, se ha procedido a realizar distintos tests variando tanto la carga máxima de eventos como la concurrencia y el grado exponencial de crecimiento de los mismos. Esto ha sido posible  gracias a la flexibilidad ofrecida por Locust al estar levantado sobre un cluster de Kubernetes, pudiendo establecer un límite máximo de concurrencia de eventos y un incremental de los mismos. En los tests se ha establecido un límite máximo de 5 a 15K usuarios simultáneos (Peak Concurrency) escalando la frecuencia de los mismos de forma lineal, desde 5 a 20 usuarios más por segundo (Spawn Rate):



Se ha procedido a monitorizar los distintos test para así sacar conclusiones del rendimiento teniendo en cuenta las características específicas de cada uno de los formatos. Las métricas en las que se han apoyado los análisis son tanto las nativas de CloudWatch Metrics (
CPU & Memory Utilization, KPUs, LastCheckpoint SIze & Duration,..), como las métricas obtenidas a partir de las Lambdas que periódicamente consultan el número de eventos disponibles en los buckets y realizan cálculos del promedio de la latencia de los mismos.

Número de Eventos

A la hora de analizar el número total de eventos procesados, los cuales son enviados de forma gradual, es decir, a medida que pasa el tiempo cada vez son más los eventos que se envían por segundo, se identifica una tendencia bastante similar aunque destacan JSON y Hudi MoR sobre Hudi CoW en cuanto a la rendimiento. Cabe destacar que JSON muestra un crecimiento más estable y constante en comparación con Hudi MoR y CoW y esto se debe a que estos últimos son capaces de manejar actualizaciones incrementales en los datos.

La similitud entre JSON y Hudi MoR hace que la elección se base completamente en las características del proyecto. En caso de que los datos no sean actualizados JSON puede resultar una solución más interesante debido principalmente a su simplicidad, mientras que si hay una alta frecuencia de actualización de datos históricos, Hudi MoR puede ser una mejor solución. Esto se debe tanto a la mayor eficiencia en las tareas de lectura como por la posibilidad de registrar las distintas versiones de los datos.

Latencia

Debido a la dificultad de estandarizar la lógica del cálculo de la latencia entre 3 tipos de almacenamiento distintos, se ha optado por simplificarla calculandolo como la diferencia entre la hora de creación del evento y la del procesamiento en la respectiva aplicación.

Se observa un comportamiento similar entre JSON y Hudi MoR, aunque este primero de una forma más crítica, al tener una latencia inicial muy baja pero a medida que tanto el tiempo de procesamiento como el volumen de carga aumenta, esta latencia se ve negativamente afectada.

La elección entre JSON y Hudi MoR dependerá tanto de la tolerancia de fallo que tenga la aplicación como las propias características de cada uno de los formatos, en caso de que la estructura de los datos sea estable y no cambie con frecuencia,o bien, no dependa de actualizaciones incrementales y pueda lidiar con reescrituras completas, en ese caso JSON puede que sea una mejor opción.

La elección de Hudi CoW sobre MoR puede darse cuando se necesite una alta tolerancia a errores y una alta capacidad de recuperación de eventos de escritura fallidos o corrompidos.`

Uso de CPU

Al analizar el uso de CPU, se ha identificado cierta homogeneidad entre los distintos tests aun trabajando con distintas cargas de trabajo. JSON Y Hudi MoR destacan por tener los niveles de uso de CPU más bajos, ambos por distintos motivos. JSON destaca por la simplicidad al incluir directamente los nuevos datos sin necesidad de tener que lidiar con versionado de datos, mientras que MoR no consume tanta CPU ya que por sus características, el consumo mayor de CPU se hace al realizar consultas de lectura, en las tareas de escritura únicamente identifica los cambios que serán aplicados al consultarlos.

Recordar que las métricas nativas de CloudWatch únicamente nos permiten monitorizar las aplicaciones, que corresponden a las tareas de escritura. La monitorización de las tareas de lectura corresponde a las Lambdas mencionadas anteriormente.

En este caso MoR es más beneficioso respecto a CoW, dado que el mayor consumo de CPU en MoR se produce al consultar los datos almacenados mientras que en CoW tiene lugar al actualizar los datos.

La elección entre los formatos más eficientes se deben a las necesidades del proyecto, en caso de que se requiera una mayor tolerancia al fallo, versionado de los datos y una mayor eficiencia de lectura, se optara por MoR frente a JSON, entre los dos formatos de Hudi, de nuevo, la elección dependerá de las características del proyecto, en caso de que las consultas requieran transformaciones pesadas y/o complejas se optaría por MoR, si en cambio, el proyecto requiera de una mayor integridad de datos y/o la ingesta de datos sea en batch,  resultaría más interesante CoW debido a que al trabajar con esos volúmenes de datos, el contar con copias de seguridad, en caso de surgir errores, el impacto en término de costes y tiempo de recuperación es menor.

Memory Utilization

JSON de nuevo destaca por tener los valores de uso de memoria más bajos aunque para la operativa de transformaciones que se realizan son relativamente altos y más teniendo en cuenta que no tiene que lidiar con la administración de versiones o la combinación de datos. Estos valores se deben a que no tiene capacidades de compresión optimizadas ni manejo eficiente de esquemas.

Respecto a Hudi, se pueden obtener unas conclusiones similares a las del apartado de uso de CPU, MoR tiene una utilización de memoria mayor que JSON debido al procesamiento de logs delta y la administración de versiones y una menor a CoW ya que la consolidación real de los datos no ocurre durante la escritura.

Last Checkpoint Size

Destacar, nuevamente, la estabilidad de JSON frente a las aplicaciones Hudi, ya que no solo muestra en los test realizados un valor inferior a ambos, si no una estabilidad que no se consigue ni con MoR ni CoW, ya que como puede apreciarse, al monitorear el tamaño de los Checkpoints, se percibe una volatilidad considerable.

La volatilidad percibida en las aplicaciones Hudi se debe principalmente a fallos surgidos en Checkpoints lo que conlleva que el Checkpoint posterior al fallido, tenga un volumen mayor. Además de esto, la volatilidad en los tamaños de los Checkpoints puede estar relacionado con las operaciones de optimización y compactación realizadas internamente que puede conllevar la compactación del estado y que esto reduzca considerablemente el tamaño del mismo.

Desafíos en el desarrollo

Read Throughput de Kinesis y EFO

Para no sobrepasar el límite de lectura sobre el Kinesis Stream se ha optado por suscribir los consumidores como Enhanced Fan-Out. En algunas pruebas en conjunto con Autoscaling esto ha dado problemas con el conector de Kinesis de Flink siendo incapaces de cerrar conexiones a la hora de escalar el cluster.

Configuración de Hudi

La configuración de Hudi ha sido otro de los puntos de fricción durante el desarrollo. Bajo cargas elevadas los procesos de compactación y limpieza son más propensos a causar problemas de Backpressure y causar errores en la aplicación. Aunque configurar estos procesos para que ocurran de forma asíncrona puede aliviar este problema, pueden surgir conflictos y desalineación entre procesos bajo cargas elevadas. Un equilibrio entre estas configuraciones y la capacidad del cluster de la aplicación son claves para el buen funcionamiento de la aplicación.

Heterogeneidad de formato

Al hacer un análisis del rendimiento de las 3 aplicaciones, se cuenta con una dificultad adicional debido a la naturaleza de los tipos de formato, teniendo esto tanto un impacto a la hora de plantear la arquitectura como en el planteamiento de las lógicas.

El distinto comportamiento de los formatos en la ingesta, complica el desarrollo de las
lógicas a la hora de calcular la latencia. MoR escribe en logs previa compactación, por lo que los datos no están disponibles inmediatamente como ocurre con CoW o JSON.  Esto implica que la métrica común medible para todos los formatos es la de disponibilidad de lectura, la cual no es el principal objetivo de una tabla MoR.  

Sincronización con el Glue Catalog

Una de las grandes ventajas que nos hemos encontrado con Hudi es su capacidad para sincronizarse con el catálogo de Glue, creando las tablas y manteniéndose actualizadas sin necesidad de un crawler. Esto permite una aplicación y arquitectura más limpia que para el caso de JSON, para el cual debe ejecutarse manualmente al desplegar las aplicaciones.

Conclusiones

Los resultados de los tests muestran diferencias considerables entre los formatos JSON, Hudi MoR y CoW en términos de eficiencia, capacidad de respuesta y utilización de recursos. Se procede a analizar cada uno de los aspectos más en detalle:

  • Eficiencia de Procesamiento: JSON y Hudi MoR destacan en la mayoría de las métricas, mostrando un desempeño óptimo en términos de Latencia, CPU & Memory Utilization. Sin embargo, el comportamiento de JSON es más estable y predecible, aunque MoR cuente con ventajas sobre JSON, como por ejemplo, en la gestión de actualizaciones incrementales.

  • Resiliencia y Tolerancia a Fallos: la tolerancia a fallos es un factor muy importante en la decisión sobre la elección entre Hudi y JSON. En el caso de  MoR y CoW, dependerá del grado de criticidad, ya que a nivel general el rendimiento en tareas de escritura para MoR es superior.

  • Uso de Recursos: JSON se muestra como el más ligero, con baja utilización de CPU y memoria, debido a su simplicidad inherente. Mientras que Hudi MoR y CoW, por la naturaleza de su diseño y gestión de datos, requieren más recursos, especialmente en operaciones que involucran el manejo de versiones y la compactación de datos.

Para finalizar, resulta interesante identificar en qué casos de uso o proyectos puede resultar más recomendable cada uno de los formatos en función de las características de los mismos y las red flags que puedan establecerse:

  • JSON: Recomendado para aplicaciones con estructuras de datos estables que no requieren actualizaciones incrementales y donde la simplicidad y la estabilidad son clave.

  • Hudi MoR: Adecuado para proyectos que requieren una gestión eficiente de actualizaciones incrementales y donde la latencia y la eficiencia en la escritura son cruciales.

  • Hudi CoW: Ideal para contextos donde la integridad de los datos es esencial, y se necesita una robusta recuperación de errores, especialmente en escenarios de ingestas en batch. 

Referencias

[1] Configuraciones Tablas Hudi. [link]

[2] Tipos de Indexacion Hudi. [link]

[3] Tipos de Operaciones Hudi. [link]


Autores

Alberto Jaen - AWS Cloud Engineer

Empecé mi carrera laboral con el desarrollo, mantenimiento y administración de bases de datos multidimensionales y Data Lakes. A partir de ahí comencé a estar interesado en plataformas de datos y arquitecturas cloud, estando certificado 3 veces en AWS y 2 con Hashicorp.

Actualmente me encuentro trabajando como un Cloud Engineer desarrollando Data Lakes y DataWarehouses con AWS para un cliente relacionado con la organización de eventos deportivos a nivel mundial.

Alfonso Jerez - AWS Cloud Engineer

Apasionado de los datos y las nuevas tecnologías, especializado como AWS Cloud Engineer en la optimización de DataWarehouses y procesos de ingesta y transformación de Data Lakes. Motivado por la mejora continua y automatización de la integración de servicios.

Colaborando activamente con el grupo de Práctica Cloud en investigaciones y desarrollo de blogs de tecnologías punteras e innovadoras tales como esta, fomentando así el continuo aprendizaje.

Adrián Jiménez - AWS Cloud Engineer

Dedicado al aprendizaje constante de nuevas tecnologías y su aplicación, disfrutando de utilizarlas en la resolución de desafíos tecnológicos. Desarrollo mi carrera como Cloud Engineer diseñando, implementando y manteniendo infraestructura en AWS.

Colaboro activamente en la Práctica Cloud, donde investigamos y experimentamos con nuevas tecnologías, buscando soluciones para los retos que enfrentan nuestros clientes.


Content in English

LakeHouse Streaming on AWS with Apache Flink and Hudi (Part 2)

Introduction

This article is the second in a series of publications focusing on the creation of a LakeHouse with Hudi from a streaming ingest processed by a Flink application. The first article focuses on laying a good foundation for this platform, where Flink applications were deployed with KDA (Kinesis Data Analytics) for each type of format (MoR, CoW for Hudi and JSON) that write the result of this processing into buckets.

The input data was sent in the previous article from a local machine running a Locust application, which can present problems when scaling and processing a high volume of events. In addition, Kinesis Data Analytics applications with Flink present agility problems in their auto-scaling mode. All these new challenges will be solved in this article.

These tables will also be cataloged in Glue, a service that provides a data catalog in AWS, in order to access them and perform queries of all kinds. The query engine that will consume this metadata will be Athena, which provides a scalable, agile and serverless experience to be able to execute queries with SQL or Spark for our tables hosted in S3.

On the other hand, in this article we have also deployed the necessary components to be able to monitor our applications and thus draw conclusions about the speed at which data is ingested and the possible problems to be solved so that the processing has the required latency according to the requirements imposed.

Finally, a performance and latency comparison of the different Flink applications that write data in Hudi and JSON formats will be made in order to see the different advantages and disadvantages of these formats.

Table of Contents

Introduction

Architecture

Scaling

Hudi

Hudi configuration

Development challenges

Stress Tests & Insights

Conclusions

References

Authors

Architecture

Below you can see the high-level architecture that will be deployed:

For a better understanding we are going to explain it from left to right. As you can see, the most notable change with respect to the first article is the inclusion of a Kubernetes cluster to be able to scale the events that will be sent as input to our streaming application. In this way, it will be possible to thoroughly test the performance of Flink applications depending on their provisioning and especially on the type of format and table in which they write to the LakeHouse. In addition, an ALB (Application Load Balancer) has been made available to access the Locust interface to define the number of users to simulate and how they should scale over time. The URL to access this will appear as output when deploying the infrastructure with Terraform.

On the other hand, significant changes have been made to the Flink KDA applications and the stream they read from. Each application now reads as EFO (Enhanced Fan Out) consumers, so that each of them has a dedicated bandwidth. The reason for this change and its details will be explained in more detail in the dedicated section for Kinesis.

Regarding the monitoring and extraction of metrics in NRT (Near Real Time), lambdas functions have been deployed that query the tables based on Athena thanks to having registered the metadata of these tables in the Glue catalog. It is important to note that the metadata of Hudi tables are registered in Glue by Flink but in the case of JSON a crawler is deployed that registers these tables in the catalog. This crawler must be executed manually for this table to be registered in Glue.

Scaling

Kinesis Stream

Since the goal is to subject the application to a considerable load of events per second, it is necessary to explain how each of the pieces of the architecture can scale according to the volume of data.

As previously mentioned, a Kinesis Stream On-Demand has been chosen to automate the scaling of the shards during load testing. It should be noted that these streams can accommodate a write rate of up to 200% of that specified by the number of shards at any given time.

Once the stream is above 100%, it will automatically increase the number of shards within 15 minutes. The only limitation is therefore not to exceed twice the supported write volume in less than that period.

On the other hand, since you will have three Flink applications reading from the same stream, read limitations will be the biggest problem. A Kinesis Stream only supports 5 GetRecord calls per shard per second. Since each application has to read the entire stream (and therefore all shards), increasing the number of shards does not help to solve this problem.

The solution is to register each application as an Enhanced Fan-Out consumer. This functionality of the Kinesis Stream provides each of these consumers with an individual limit of 5 GetRecord calls and 2MB per shard per second of reading.

This configuration is done on the consumer side, in our case via the Kinesis connector for Flink.

'scan.stream.recordpublisher' = 'EFO',

'scan.stream.efo.registration' = 'EAGER/LAZY',

'scan.stream.efo.consumpername' = '{consumer_name}'

It is worth mentioning that alternatively, it is possible to increase the read latency of our Flink applications. By default Flink performs a read every 200ms per shard, so one application completely consumes the read quota of a stream. By increasing this value to 600ms we could accommodate all three applications, at the cost of increased latency.

scan.shard.getrecords.intervalmillis = '600'

Use will also be made of the Adaptive Reads option, which dynamically modifies the number of events collected per call depending on the size of each record. This makes it possible to take advantage of the 2 MB/s per shard available for each consumer.  

'scan.shard.adaptivereads' = 'true'

Regarding scaling in Flink KPUs (Kinesis Processing Unit), we have chosen not to make use of autoscaling, since each scaling process incurs in downtime for the application. Due to the different requirements of each of the applications, scaling actions at unexpected times could interrupt load testing. In addition, it is interesting to measure the write performance of each of the applications at equal computing capacity.

Hudi

Timeline

One of the basic systems on which Hudi's operation and features are based is the timeline. Hudi keeps a temporary record of all the actions that have been performed on the table, as well as the status of this action.

The main actions that make up the timeline are as follows

  • Commits - atomic writing of a set of records to the table in columnar format
  • Delta Commit - similar to commit, represents a write of records in the form of logs to a Merge on Read table.
  • Compaction - compaction of log writes (delta commits) from a MoR table to columnar format
  • Cleans - deletion of old versions of files
  • Rollback - deleted from records written by a failed commit or delta commit
  • Savepoint - marks a set of files as "saved" so that they will not be deleted by the cleanup process. Allows to restore the table to a previous point in the timeline.

Any of these actions can be found in one of three states

  1. Requested - an action has been planned but not yet started
  2. Inflight - the action is in progress
  3. Completed - denotes that the action has been completed.

Table types

As hinted in the operation of the Hudi timeline, there are two types of writing supported: columnar and logs. The columnar (parquet) format constitutes the final form of a Hudi table, together with the timeline metadata. However, it is possible to make use of log writes (avro) to decrease the write latency and eventually compact to columnar format without hindering the write.

The use of these writing methods gives rise to the two types of table that Hudi makes available to us

  • Copy on Write - writes are performed exclusively in columnar format, creating a new file with the new table records. The data is available immediately but incurs higher write latency.

  • Merge on Read - makes use of writing to logs. The new records are initially written as logs, and will later be transformed to columnar format by the compaction process. We obtain lower write latency at the cost of read latency; the new logs will not be available until compaction is performed.

Query Types

In order to take advantage of the characteristics of each type of table, there are three types of queries that can be performed on a Hudi table

  • Snapshot - obtains the latest version of the table. For MoR tables this involves incurring a compaction process to get the latest records in log format.

  • Read Optimized - for MoR tables, reads only the records already exposed in columnar format without incurring additional read latency.

  • Incremental - collects only new records since a certain commit or compact, facilitating the creation of incremental pipelines. Not supported by Athena

Integration with Glue Catalog

The Hudi connector allows a native integration with the Glue catalog in AWS. Simply add the Hive dependencies in our Flink application.

com.amazonaws.aws-java-sdk-glue

org.apache.hive.hive-common

org.apache.hive.hive-exec

And specify the catalog configuration in the Hudi connector

'hive_sync.enable' = 'true',

'hive_sync.db' = '{glue_database}',

'hive_sync.table' = '{table_name}',

'hive_sync.partition_fields' = '{partition_fields}',

'hive_sync.mode' = 'glue',

'hive_sync.use_jdbc' = 'false' = 'false'.

With this integration, the application will automatically create the tables in the catalog. As mentioned before, there are different types of queries to query a Hudi table. Therefore, different tables will be created in the catalog to support the different queries.

For a CoW table, the table will be queried using a Snapshot query. For MoR on the other hand, two tables will be made available to support Read Optimized or Snapshot queries.

The main application of Glue is to support lambdas so that when executing queries through Athena their execution can be done in a more efficient, fast and secure way:

  • Glue Catalog: centralized storage of information about the organization, design and format of the data, used by Athena to directly perform queries to S3 without having to rely on third parties to obtain this information.

  • Schema Automation: Glue automatically tracks and catalogs data in S3, detecting and adapting schema changes. This avoids possible errors and allows the reading of new fields in case of alterations in the event schemas.

 

Hudi configuration

It is important to understand the configurations offered by Hudi to optimize our application, in particular for a Near Real Time application it is convenient to be aware of the available options. Although the configuration capacity is immense [1], we will try to summarize the most relevant ones for a first contact with this technology.

Partitioning

Apache Hudi offers the types of partitioning that can be found in other solutions, the main ones will be detailed and the implemented one will be justified:

  • Simple: partitioning based on a single field, in this case the field chosen is 'ticker' as it has been identified as the one with the lowest cardinality.

  • Compound Partitioning: partitioning based on multiple fields, it could be interesting to choose a low cardinality field (ticker) and a medium cardinality field (date).

  • Dynamic Partitioning: choice of the variable based on the values, it can be interesting when the cardinality of the variables can undergo variations and an update of the partitioning is required in an automatic and flexible way.

Indexes

Apache Hudi has multiple types of indexing[2], we will briefly discuss the most common ones:

  • Bloom Index - Makes use of a bloom filter on the key of the events, additionally it can be complemented with a filtering by key range. It works well when dealing with a table where most changes occur in the most recent partitions or for event deduplication.

  • Simple: indexing performed by the combination of FileID and RecordKey. Recommended when Upsert operations are not so frequent due to the simplicity it offers.

Both types of indexes can be used in their global form

  • Global index - They impose the uniqueness of the keys in all the partitions of the table, that is to say, they guarantee that there will be only one record with a certain key.

  • Non-global index - Key uniqueness is only required at the partition level. If the data is consistent and a key is only going to exist in one partition, this type of index offers much better performance and better scaling.

In this case, a Bloom Index has been chosen, which is the default in case it is not expressly stated:

"hoodie.index.type" = "BLOOM"

The choice of this type of indexing is due to the fact that the use cases that have been raised require a considerably high and efficient data processing.

Types of operations

Apache Hudi offers several types of operations[3] that allow users to manage and modify large data sets.
The main operations performed in Stress Tests as well as in other scenarios are detailed below:

  • Upsert - This is the default operation, and will execute an insert or an update depending on whether the record already exists after an index lookup. With this operation the table will have no duplicates for its primary key.

  • Insert - This operation ignores the index lookup when inserting events. It is the fastest but the table may contain duplicates. It is still useful if auxiliary deduplication methods are used, or simply the existence of these is tolerable in the use case.

  • Delete: Hudi offers two deletion methods. Soft Delete converts to null the values of the event except for the key. Hard Delete executes a physical deletion of the event in the table.

  • Bulk Insert Operation similar to Insert but optimized for insertion of a large volume of data, at the cost of sacrificing some guarantees in file size control. Scales well for hundreds of TBs in case of initial bootstrap of a large table.

Compaction

In the case of using a MoR table, it is possible to configure the log compaction rate to find the balance between write and read latency that best suits the use case. It is possible to specify a strategy of time or number of delta commits (or both) that execute a compaction process.

compaction.delta_commits

compaction.delta_seconds

compaction.trigger.strategy

Asynchronous actions

Certain timeline actions such as compacting, cleaning, archiving and clustering can be performed asynchronously by the application, or even relegated to auxiliary processes to the writing application. In the case of Flink, it can help improve write latency and avoid BackPressure problems in the application.

compaction.async.enabled

hoodie.clean.async

hoodie.archive.async

hoodie.clustering.async.enabled

Stress Tests & Insights

When deploying the applications, different tests have been performed, varying both the maximum load of events and the concurrency and exponential degree of growth of the same. This has been possible thanks to the flexibility offered by Locust being built on a Kubernetes cluster, being able to set a maximum limit of concurrency of events and an incremental of them. In the tests, a maximum limit of 5 to 15K simultaneous users (Peak Concurrency) has been established, scaling the frequency of the same in a linear way, from 5 to 20 more users per second (Spawn Rate):



The different tests have been monitored in order to draw conclusions about the performance, taking into account the specific characteristics of each of the formats. The metrics on which the analyses have been based are both the native CloudWatch Metrics (CPU & Memory Utilization, KPUs, LastCheckpoint SIze & Duration,...), as well as the metrics obtained from the Lambdas that periodically consult the number of events available in the buckets and calculate the average latency of the same.

Number of Events

When analyzing the total number of events processed, which are sent gradually, i.e., as time passes more and more events are sent per second, a fairly similar trend is identified although JSON and Hudi MoR stand out over Hudi CoW in terms of performance. It is worth noting that JSON shows a more stable and steady growth compared to Hudi MoR and CoW and this is because the latter are able to handle incremental updates in the data.

The similarity between JSON and Hudi MoR makes the choice entirely based on the characteristics of the project. In case the data is not updated JSON may be a more interesting solution mainly due to its simplicity, while if there is a high frequency of historical data update, Hudi MoR may be a better solution. This is due both to the higher efficiency in reading tasks and because of the possibility to record different versions of the data.

Latency

Due to the difficulty of standardizing the latency calculation logic between 3 different types of storage, we have chosen to simplify it by calculating it as the difference between the time of event creation and the time of processing in the respective application.

Similar behavior is observed between JSON and Hudi MoR, although the former in a more critical way, having a very low initial latency but as both processing time and load volume increases, this latency is negatively affected.

The choice between JSON and Hudi MoR will depend both on the fault tolerance of the application and the characteristics of each of the formats, in case the data structure is stable and does not change frequently, or does not depend on incremental updates and can deal with complete rewrites, then JSON may be a better choice.

The choice of Hudi CoW over MoR can be made when high error tolerance and high recoverability from failed or corrupted write events are required.

CPU utilization

When analyzing CPU usage, a certain homogeneity has been identified among the different tests, even when working with different workloads. JSON and Hudi MoR stand out for having the lowest CPU usage levels, both for different reasons. JSON stands out for its simplicity by directly including the new data without having to deal with data versioning, while MoR does not consume as much CPU since, due to its characteristics, the highest CPU consumption is made when performing read queries, in the write tasks it only identifies the changes that will be applied when querying them.

Remember that CloudWatch native metrics only allow us to monitor the applications, which correspond to the writing tasks. The monitoring of read tasks corresponds to the Lambdas mentioned above.

In this case MoR is more beneficial with respect to CoW, since the higher CPU consumption in MoR occurs when querying the stored data while in CoW it occurs when updating the data.

The choice between the most efficient formats depends on the needs of the project, in case a higher fault tolerance, data versioning and higher reading efficiency are required, MoR will be chosen over JSON, between the two Hudi formats, again, the choice will depend on the characteristics of the project, if the queries require heavy and/or complex transformations, MoR would be chosen; if, on the other hand, the project requires greater data integrity and/or the data ingestion is in batch, CoW would be more interesting because when working with these volumes of data, having backup copies, in case of errors, the impact in terms of costs and recovery time is lower.

Memory Utilization

JSON again stands out for having the lowest memory usage values, although for the number of transformations that are performed, they are relatively high, especially considering that it does not have to deal with version management or data merging. These values are due to the fact that it does not have optimized compression capabilities or efficient schema management.

Regarding Hudi, similar conclusions can be drawn as in the CPU usage section, MoR has a higher memory utilization than JSON due to delta log processing and version management and a lower one to CoW since the actual data consolidation does not occur during writing.

Last Checkpoint Size

It is important to highlight, once again, the stability of JSON compared to Hudi applications, since it not only shows a lower value than both in the tests performed, but also a stability that is not achieved with either MoR or CoW, since, as can be seen, when monitoring the size of the Checkpoints, considerable volatility is perceived.

Perceived volatility in Hudi applications is mainly due to Checkpoint failures, which leads to a larger Checkpoint volume after the failure. In addition to this, the volatility in Checkpoint sizes may be related to the optimization and compaction operations performed internally that may lead to state compaction, which considerably reduces the size of the Checkpoint.

Development challenges

Read Throughput of Kinesis and EFO

In order not to exceed the read limit on the Kinesis Stream we have chosen to subscribe the consumers as Enhanced Fan-Out. In some tests in conjunction with Autoscaling this has given problems with the Flink Kinesis connector being unable to close connections when scaling the cluster.

Hudi configuration

Hudi's configuration has been another sticking point during development. Under high loads the compaction and cleanup processes are more likely to cause backpressure problems and cause application errors. Although configuring these processes to occur asynchronously can alleviate this problem, conflicts and misalignment between processes can arise under high loads. A balance between these configurations and the application's cluster capacity are key to the smooth operation of the application.

Format heterogeneity

When analyzing the performance of the 3 applications, there is an additional difficulty due to the nature of the format types, which has an impact both on the architecture and on the development of the logics.

The different behavior of the formats in the ingest complicates the development oflogics when calculating latency. MoR writes to logs after compaction, so the data is not immediately available as is the case with CoW or JSON.  This implies that the common measurable metric for all formats is read availability, which is not the main purpose of a MoR table.  

Synchronization with the Glue Catalog

One of the great advantages we have found with Hudi is its ability to synchronize with the Glue catalog, creating the tables and keeping them updated without the need for a crawler. This allows for a cleaner application and architecture than in the case of JSON, for which it must be run manually when deploying applications.

Conclusions

The test results show considerable differences between the JSON, Hudi MoR and CoW formats in terms of efficiency, responsiveness and resource utilization. We proceed to analyze each of the aspects in more detail:

  • Processing Efficiency: JSON and Hudi MoR stand out in most metrics, showing optimal performance in terms of Latency, CPU & Memory Utilization. However, JSON behavior is more stable and predictable, although MoR has advantages over JSON, for example, in incremental update management.

  • Resilience and Fault Tolerance: fault tolerance is a very important factor in the decision on the choice between Hudi and JSON. In the case of MoR and CoW, it will depend on the degree of criticality, since at a general level the performance in writing tasks for MoR is superior.

  • Resource Usage: JSON is shown to be the most lightweight, with low CPU and memory utilization, due to its inherent simplicity. Whereas Hudi MoR and CoW, due to the nature of their design and data management, require more resources, especially in operations involving version management and data compaction.

Finally, it is interesting to identify in which use cases or projects each of the formats may be more recommendable depending on their characteristics and the network flags that may be established:

  • JSON: Recommended for applications with stable data structures that do not require incremental updates and where simplicity and stability are key.

  • Hudi MoR: Suitable for projects that require efficient management of incremental updates and where latency and writing efficiency are crucial.

  • Hudi CoW: Ideal for contexts where data integrity is essential, and robust error recovery is needed, especially in batch ingest scenarios.

References

[1] Hudi Tables Configuration. [link]

[2] Index Types in Hudi. [link]

[3] Hudi Operation Types. [link]


Authors

Alberto Jaen - AWS Cloud Engineer

I started my career with the development, maintenance and administration of multidimensional databases and Data Lakes. From there I started to be interested in data platforms and cloud architectures, being certified 3 times in AWS and 2 with Hashicorp.

I am currently working as a Cloud Engineer developing Data Lakes and DataWarehouses with AWS for a client related to the organization of sporting events worldwide.

Alfonso Jerez - AWS Cloud Engineer

Passionate about data and new technologies, specialized as AWS Cloud Engineer in DataWarehouses optimization and Data Lakes ingestion and transformation processes. Motivated by continuous improvement and automation of service integration.

Actively collaborating with the Cloud Practice group in research and blog development of cutting-edge and innovative technologies such as this one, thus fostering continuous learning.

Adrián Jiménez - AWS Cloud Engineer

Dedicated to constantly learning new technologies and their application, enjoying using them to solve technological challenges. I develop my career as a Cloud Engineer designing, implementing and maintaining infrastructure in AWS.

I actively collaborate in the Cloud Practice, where we research and experiment with new technologies, seeking solutions to the challenges faced by our clients.

Walter Talaverano

Microsoft Certified | Certified

¿Quieres saber más de lo que ofrecemos y ver otros casos de éxito?