paint-brush
Desenvolvendo Streaming Data Lakes com Hudi e MinIOpor@minio
7,263 leituras
7,263 leituras

Desenvolvendo Streaming Data Lakes com Hudi e MinIO

por MinIO14m2023/08/29
Read on Terminal Reader

Muito longo; Para ler

Apache Hudi foi o primeiro formato de tabela aberto para data lakes e é digno de consideração em arquiteturas de streaming. O uso do MinIO para armazenamento Hudi abre caminho para data lakes e análises em várias nuvens.
featured image - Desenvolvendo Streaming Data Lakes com Hudi e MinIO
MinIO HackerNoon profile picture
0-item
1-item
2-item

Apache Hudi é uma plataforma de streaming de data lake que traz funcionalidades básicas de warehouse e banco de dados diretamente para o data lake. Não contente em se autodenominar um formato de arquivo aberto como Delta ou Apache Iceberg , o Hudi fornece tabelas, transações, upserts/exclusões, índices avançados, serviços de ingestão de streaming, otimizações de cluster/compactação de dados e simultaneidade.


Introduzido em 2016, o Hudi está firmemente enraizado no ecossistema Hadoop, sendo responsável pelo significado por trás do nome: Hadoop Upserts and Incrementals. Foi desenvolvido para gerenciar o armazenamento de grandes conjuntos de dados analíticos em HDFS. O objetivo principal do Hudi é diminuir a latência durante a ingestão de dados de streaming.


Mesa Hudi


Com o tempo, o Hudi evoluiu para usar armazenamento em nuvem e armazenamento de objetos, incluindo MinIO. O afastamento da Hudi do HDFS anda de mãos dadas com a tendência mundial de deixar para trás o HDFS legado para armazenamento de objetos com desempenho, escalável e nativo da nuvem. A promessa da Hudi de fornecer otimizações que tornam as cargas de trabalho analíticas mais rápidas para Apache Spark, Flink, Presto, Trino e outros se encaixa perfeitamente com a promessa do MinIO de desempenho de aplicativos nativos da nuvem em escala.


As empresas que usam o Hudi na produção incluem Uber , Amazon , ByteDance e Robinhood . Estes são alguns dos maiores data lakes de streaming do mundo. A chave para o Hudi neste caso de uso é que ele fornece uma pilha incremental de processamento de dados que conduz processamento de baixa latência em dados colunares. Normalmente, os sistemas gravam os dados uma vez usando um formato de arquivo aberto, como Apache Parquet ou ORC, e os armazenam em cima de um armazenamento de objetos altamente escalável ou de um sistema de arquivos distribuído. Hudi serve como um plano de dados para ingerir, transformar e gerenciar esses dados. O Hudi interage com o armazenamento usando a API Hadoop FileSystem , que é compatível com (mas não necessariamente ideal para) implementações que vão desde HDFS até armazenamento de objetos e sistemas de arquivos na memória.

Formato de arquivo Hudi

Hudi usa um arquivo base e arquivos de log delta que armazenam atualizações/alterações em um determinado arquivo base. Os arquivos base podem ser Parquet (colunares) ou HFile (indexados). Os logs delta são salvos como Avro (linha) porque faz sentido registrar as alterações no arquivo base à medida que ocorrem.


Hudi codifica todas as alterações em um determinado arquivo base como uma sequência de blocos. Os blocos podem ser blocos de dados, blocos de exclusão ou blocos de reversão. Esses blocos são mesclados para derivar arquivos base mais recentes. Essa codificação também cria um log independente.



Formato de arquivo Hudi

Fonte .

Formato de tabela Hudi

Um formato de tabela consiste no layout do arquivo da tabela, no esquema da tabela e nos metadados que rastreiam as alterações na tabela. Hudi impõe esquema na gravação, consistente com a ênfase no processamento de fluxo, para garantir que os pipelines não sejam interrompidos devido a alterações não compatíveis com versões anteriores.


Hudi agrupa arquivos para uma determinada tabela/partição e mapeia entre chaves de registro e grupos de arquivos. Conforme mencionado acima, todas as atualizações são registradas nos arquivos de log delta de um grupo de arquivos específico. Esse design é mais eficiente que o Hive ACID, que deve mesclar todos os registros de dados em todos os arquivos base para processar consultas. O design do Hudi prevê upserts e exclusões rápidas baseadas em chaves, pois funciona com logs delta para um grupo de arquivos, não para um conjunto de dados inteiro.


Hudi agrupa arquivos para uma determinada tabela/partição e mapeia entre chaves de registro e grupos de arquivos. Conforme mencionado acima, todas as atualizações são registradas nos arquivos de log delta de um grupo de arquivos específico. Esse design é mais eficiente que o Hive ACID, que deve mesclar todos os registros de dados em todos os arquivos base para processar consultas. O design do Hudi prevê upserts e exclusões rápidas baseadas em chaves, pois funciona com logs delta para um grupo de arquivos, não para um conjunto de dados inteiro.


Formato de tabela Hudi

Fonte .


É fundamental entender a linha do tempo porque ela serve como uma fonte de registro de eventos verdadeiros para todos os metadados da tabela do Hudi. A linha do tempo é armazenada na pasta .hoodie ou no balde, no nosso caso. Os eventos são retidos na linha do tempo até serem removidos. A linha do tempo existe para uma tabela geral, bem como para grupos de arquivos, permitindo a reconstrução de um grupo de arquivos aplicando os logs delta ao arquivo base original. Para otimizar gravações/confirmações frequentes, o design do Hudi mantém os metadados pequenos em relação ao tamanho da tabela inteira.


Novos eventos na linha do tempo são salvos em uma tabela interna de metadados e implementados como uma série de tabelas mescladas na leitura, proporcionando assim baixa amplificação de gravação. Como resultado, o Hudi pode absorver rapidamente mudanças rápidas nos metadados. Além disso, a tabela de metadados usa o formato de arquivo base HFile, otimizando ainda mais o desempenho com um conjunto de pesquisas indexadas de chaves que evita a necessidade de ler toda a tabela de metadados. Todos os caminhos de arquivos físicos que fazem parte da tabela são incluídos nos metadados para evitar listagens de arquivos em nuvem dispendiosas e demoradas.

Escritores Hudi

Os gravadores Hudi facilitam arquiteturas onde o Hudi atua como uma camada de gravação de alto desempenho com suporte a transações ACID que permite alterações incrementais muito rápidas, como atualizações e exclusões.


Uma arquitetura Hudi típica depende de pipelines Spark ou Flink para entregar dados às tabelas Hudi. O caminho de gravação do Hudi é otimizado para ser mais eficiente do que simplesmente gravar um arquivo Parquet ou Avro no disco. Hudi analisa as operações de gravação e as classifica como incrementais ( insert , upsert , delete ) ou operações em lote ( insert_overwrite , insert_overwrite_table , delete_partition , bulk_insert ) e então aplica as otimizações necessárias.


Os redatores do Hudi também são responsáveis pela manutenção dos metadados. Para cada registro, o tempo de confirmação e um número de sequência exclusivo para esse registro (isto é semelhante a um deslocamento Kafka) são escritos, possibilitando derivar alterações no nível do registro. Os usuários também podem especificar campos de horário de evento em fluxos de dados recebidos e rastreá-los usando metadados e a linha do tempo do Hudi. Isso pode trazer melhorias drásticas no processamento de fluxo, pois o Hudi contém a hora de chegada e o horário do evento para cada registro, tornando possível construir marcas d'água fortes para pipelines complexos de processamento de fluxo.

Leitores Hudi

O isolamento de instantâneos entre gravadores e leitores permite que instantâneos de tabelas sejam consultados de forma consistente em todos os principais mecanismos de consulta de data lake, incluindo Spark, Hive, Flink, Prest, Trino e Impala. Assim como Parquet e Avro, as tabelas Hudi podem ser lidas como tabelas externas por empresas como Snowflake e SQL Server .


Os leitores Hudi são desenvolvidos para serem leves. Sempre que possível, são usados leitores vetorizados e cache específicos do mecanismo, como os do Presto e do Spark. Quando o Hudi precisa mesclar arquivos de base e de log para uma consulta, o Hudi melhora o desempenho da mesclagem usando mecanismos como mapas que podem ser derramados e leitura lenta, ao mesmo tempo que fornece consultas otimizadas para leitura.


O Hudi inclui mais do que alguns recursos de consulta incremental notavelmente poderosos. Os metadados estão no centro disso, permitindo que grandes commits sejam consumidos como pedaços menores e dissociando totalmente a gravação e a consulta incremental de dados. Através do uso eficiente de metadados, a viagem no tempo é apenas mais uma consulta incremental com um ponto de início e de parada definidos. O Hudi mapeia chaves atomicamente para grupos de arquivos únicos em qualquer momento, suportando recursos completos de CDC em tabelas Hudi. Conforme discutido acima na seção de escritores do Hudi, cada tabela é composta por grupos de arquivos e cada grupo de arquivos possui seus próprios metadados independentes.

Viva o Hudi!

A maior força do Hudi é a velocidade com que ele ingere dados de streaming e em lote. Ao fornecer a capacidade de upsert , o Hudi executa tarefas em ordens de grandeza mais rápidas do que reescrever tabelas ou partições inteiras.


Para aproveitar a velocidade de ingestão do Hudi, os data lakehouses exigem uma camada de armazenamento capaz de alto IOPS e taxa de transferência. A combinação de escalabilidade e alto desempenho do MinIO é exatamente o que o Hudi precisa. MinIO é mais do que capaz do desempenho necessário para alimentar um data lake corporativo em tempo real - um benchmark recente alcançou 325 GiB/s (349 GB/s) em GETs e 165 GiB/s (177 GB/s) em PUTs com apenas 32 nós de SSDs NVMe prontos para uso.


Um data lake Hudi empresarial ativo armazena um grande número de pequenos arquivos Parquet e Avro. MinIO inclui uma série de otimizações de pequenos arquivos que permitem data lakes mais rápidos. Objetos pequenos são salvos em linha com metadados, reduzindo o IOPS necessário para ler e gravar arquivos pequenos, como metadados e índices Hudi.


O esquema é um componente crítico de cada tabela Hudi. O Hudi pode impor o esquema ou permitir a evolução do esquema para que o pipeline de streaming de dados possa se adaptar sem quebrar. Além disso, o Hudi impõe o esquema no gravador para garantir que as alterações não interrompam os pipelines. Hudi confia no Avro para armazenar, gerenciar e desenvolver o esquema de uma tabela.


Hudi fornece garantias transacionais ACID para data lakes. Hudi garante gravações atômicas: os commits são feitos atomicamente em uma linha do tempo e recebem um carimbo de data/hora que indica o momento em que a ação é considerada como tendo ocorrido. Hudi isola instantâneos entre processos gravadores, tabelas e leitores para que cada um opere em um instantâneo consistente da tabela. Hudi completa isso com controle de simultaneidade otimista (OCC) entre gravadores e controle de simultaneidade sem bloqueio baseado em MVCC entre serviços de tabela e escritores e entre vários serviços de tabela.

Tutorial Hudi e MinIO

Este tutorial orientará você na configuração do Spark, Hudi e MinIO e apresentará alguns recursos básicos do Hudi. Este tutorial é baseado no Apache Hudi Spark Guide , adaptado para funcionar com armazenamento de objetos MinIO nativo da nuvem.


Observe que trabalhar com buckets versionados adiciona alguma sobrecarga de manutenção ao Hudi. Qualquer objeto excluído cria um marcador de exclusão . À medida que o Hudi limpa os arquivos usando o utilitário Cleaner, o número de marcadores de exclusão aumenta com o tempo. É importante configurar o Lifecycle Management corretamente para limpar esses marcadores de exclusão, pois a operação List pode engasgar se o número de marcadores de exclusão atingir 1.000. Os mantenedores do projeto Hudi recomendam limpar os marcadores de exclusão após um dia usando regras de ciclo de vida.

Pré-requisitos

Baixe e instale o Apache Spark.


Baixe e instale o MinIO. Registre o endereço IP, a porta TCP do console, a chave de acesso e a chave secreta.


Baixe e instale o cliente MinIO.


Baixe as bibliotecas AWS e AWS Hadoop e adicione-as ao seu classpath para usar S3A para trabalhar com armazenamento de objetos.

  • AWS: aws-java-sdk:1.10.34 (ou superior)

  • Hadoop: hadoop-aws:2.7.3 (ou superior)


Baixe os arquivos Jar, descompacte-os e copie-os para /opt/spark/jars .

Crie um intervalo MinIO

Use o cliente MinIO para criar um bucket para armazenar dados do Hudi:

 mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi

Inicie o Spark com Hudi

Inicie o shell Spark com o Hudi configurado para usar MinIO para armazenamento. Certifique-se de configurar entradas para S3A com suas configurações de MinIO.


 spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'


Em seguida, inicialize o Hudi no Spark.

 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord


Observe que isso simplificará o uso repetido do Hudi para criar umarquivo de configuração externo .

Crie uma tabela

Experimente e crie uma pequena tabela Hudi simples usando Scala. O Hudi DataGenerator é uma maneira rápida e fácil de gerar inserções e atualizações de amostra com base no esquema de viagem de amostra .


 val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator

Insira dados no Hudi e grave a tabela no MinIO

A seguir irá gerar novos dados de viagem, carregá-los em um DataFrame e gravar o DataFrame que acabamos de criar no MinIO como uma tabela Hudi. mode(Overwrite) sobrescreve e recria a tabela caso ela já exista. Os dados de viagens dependem de uma chave de registro ( uuid ), campo de partição ( region/country/city ) e lógica ( ts ) para garantir que os registros de viagem sejam exclusivos para cada partição. Usaremos a operação de gravação padrão, upsert . Quando você tem uma carga de trabalho sem atualizações, você pode usar insert ou bulk_insert , que pode ser mais rápido.


 val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)


Abra um navegador e faça login no MinIO em http://<your-MinIO-IP>:<port> com sua chave de acesso e chave secreta. Você verá a tabela Hudi no balde.


Console MinIO


O bucket também contém um caminho .hoodie que contém metadados e caminhos americas e asia que contém dados.


Metadados


Dê uma olhada nos metadados. Esta é a aparência do meu caminho .hoodie depois de concluir todo o tutorial. Podemos ver que modifiquei a tabela na terça-feira, 13 de setembro de 2022, às 9h02, 10h37, 10h48, 10h52 e 10h56.


O caminho .hoodie após concluir o tutorial

Consultar dados

Vamos carregar os dados do Hudi em um DataFrame e executar um exemplo de consulta.

 // spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

Viagem no tempo com Hudi

Não, não estamos falando sobre ir ver um show do Hootie and the Blowfish em 1988.


Cada gravação nas tabelas Hudi cria novos instantâneos. Pense nos instantâneos como versões da tabela que podem ser referenciadas para consultas de viagem no tempo.


Experimente algumas consultas de viagem no tempo (você terá que alterar os carimbos de data e hora para que sejam relevantes para você).


 spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)

Atualizar dados

Este processo é semelhante a quando inserimos novos dados anteriormente. Para mostrar a capacidade do Hudi de atualizar dados, vamos gerar atualizações para registros de viagem existentes, carregá-los em um DataFrame e, em seguida, gravar o DataFrame na tabela Hudi já salva no MinIO.


Observe que estamos usando o modo de salvamento append . Uma diretriz geral é usar o modo append , a menos que você esteja criando uma nova tabela para que nenhum registro seja substituído. Uma maneira típica de trabalhar com o Hudi é ingerir dados de streaming em tempo real, anexando-os à tabela e, em seguida, escrever alguma lógica que mescle e atualize os registros existentes com base no que acabou de ser anexado. Alternativamente, escrever usando o modo overwrite exclui e recria a tabela se ela já existir.


 // spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)


A consulta dos dados mostrará os registros de viagem atualizados.

Consulta Incremental

Hudi pode fornecer um fluxo de registros que foram alterados desde um determinado carimbo de data/hora usando consulta incremental. Tudo o que precisamos fazer é fornecer um horário de início a partir do qual as alterações serão transmitidas para ver as alterações até o commit atual, e podemos usar um horário de término para limitar o fluxo.


A consulta incremental é um grande negócio para o Hudi porque permite construir pipelines de streaming em dados em lote.


 // spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

Consulta pontual

Hudi pode consultar dados a partir de uma hora e data específicas.


 // spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

Excluindo dados com exclusões suaves

Hudi oferece suporte a duas maneiras diferentes de excluir registros. Uma exclusão reversível retém a chave de registro e anula os valores de todos os outros campos. As exclusões suaves são persistentes no MinIO e removidas apenas do data lake usando uma exclusão definitiva.


 // spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

Excluindo dados com exclusões definitivas

Em contraste, exclusões definitivas são o que chamamos de exclusões. A chave de registro e os campos associados são removidos da tabela.


 // spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

Inserir substituição

O data lake se torna um data lakehouse quando ganha a capacidade de atualizar os dados existentes. Vamos gerar alguns novos dados de viagem e, em seguida, substituir os dados existentes. Esta operação é mais rápida do que um upsert , onde o Hudi calcula toda a partição de destino de uma vez para você. Aqui especificamos a configuração para ignorar a indexação, pré-combinação e reparticionamento automáticos que upsert faria por você.


 // spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)

Evoluir esquema de tabela e particionamento

A evolução do esquema permite alterar o esquema de uma tabela Hudi para se adaptar às alterações que ocorrem nos dados ao longo do tempo.


Abaixo estão alguns exemplos de como consultar e evoluir esquema e particionamento. Para uma discussão mais aprofundada, consulte Schema Evolution | Apache Hudi . Observe que se você executar esses comandos, eles alterarão o esquema da tabela Hudi para ser diferente deste tutorial.


 -- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');


Atualmente, SHOW partitions funcionam apenas em um sistema de arquivos, pois são baseadas no caminho da tabela do sistema de arquivos.


Este tutorial usou o Spark para mostrar os recursos do Hudi. No entanto, o Hudi pode oferecer suporte a vários tipos de tabela/tipos de consulta e as tabelas do Hudi podem ser consultadas em mecanismos de consulta como Hive, Spark, Presto e muito mais. O projeto Hudi tem um vídeo de demonstração que mostra tudo isso em uma configuração baseada em Docker com todos os sistemas dependentes rodando localmente.

Uau! Uau! Vamos construir data lakes Hudi no MinIO!

Apache Hudi foi o primeiro formato de tabela aberta para data lakes e é digno de consideração em arquiteturas de streaming. A comunidade e o ecossistema Hudi estão vivos e ativos, com uma ênfase crescente na substituição do Hadoop/HDFS pelo armazenamento Hudi/objeto para data lakes de streaming nativos da nuvem. O uso do MinIO para armazenamento Hudi abre caminho para data lakes e análises em várias nuvens. O MinIO inclui replicação ativa-ativa para sincronizar dados entre locais — no local, na nuvem pública/privada e na borda — permitindo o que as grandes empresas precisam, como balanceamento de carga geográfica e failover rápido.


Experimente o Hudi no MinIO hoje. Se você tiver alguma dúvida ou quiser compartilhar dicas, entre em contato através do nosso canal no Slack .


Também publicado aqui .