Padrões de engenharia de dados¶
Este documento define as convenções para desenvolvimento no projeto, cobrindo DAGs do Airflow e modelos SQL/dbt. O objetivo é garantir consistência, rastreabilidade e facilidade de manutenção em um repositório com múltiplas origens de dados e times contribuindo simultaneamente.
Padrões para DAGs do Airflow¶
Convenção de nomenclatura¶
O nome da DAG segue o formato:
| Segmento | Descrição | Exemplos |
|---|---|---|
origem |
Sistema de origem dos dados | siafi, siape, compras_gov, transfere_gov, pncp |
dominio |
Entidade ou tema principal | execucao_orcamentaria, contratos, servidores |
acao |
O que a DAG faz | ingestao, transformacao, snapshot |
Exemplos válidos:
siafi_execucao_orcamentaria_ingestao
compras_gov_contratos_transformacao
siape_servidores_snapshot
transfere_gov_programas_ingestao
pncp_licitacoes_ingestao
O dag_id (nome da função decorada com @dag) deve ser idêntico ao nome do arquivo sem a extensão .py.
Estrutura de diretórios¶
dags/
data_ingest/
<origem>/ # uma pasta por sistema de origem
<dag>.py
dbt/
<projeto>/ # ipea, mir
cosmos_dag.py
dashboards/
<dag>.py
Novas origens ganham uma pasta própria dentro de data_ingest/. Não criar DAGs soltas na raiz de dags/.
Estilo de código — TaskFlow API¶
O projeto usa a TaskFlow API do Airflow (@dag e @task). Não usar o estilo clássico com with DAG(...).
from airflow.decorators import dag, task
from datetime import datetime, timedelta
from schedule_loader import get_dynamic_schedule
@dag(
schedule_interval=get_dynamic_schedule("origem_dominio_acao"),
start_date=datetime(2023, 1, 1),
catchup=False,
default_args={
"owner": "nome-do-responsavel",
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
tags=["origem", "dominio"],
)
def origem_dominio_acao() -> None:
@task
def executar(**context) -> None:
...
executar()
dag_instance = origem_dominio_acao()
Schedule dinâmico¶
O schedule de todas as DAGs de ingestão é definido via get_dynamic_schedule(), importado de schedule_loader.py. Não usar strings de cron ou aliases do Airflow diretamente no código da DAG.
from schedule_loader import get_dynamic_schedule
@dag(
schedule_interval=get_dynamic_schedule("nome_da_dag"),
...
)
O get_dynamic_schedule() lê a Airflow Variable dynamic_schedules (JSON) e retorna o schedule configurado para aquela DAG. Se não houver configuração, retorna @daily por padrão.
Formato da Variable dynamic_schedules:
{
"siafi_execucao_orcamentaria_ingestao": {
"type": "cron",
"value": "0 6 * * *"
},
"compras_gov_contratos_ingestao": {
"type": "preset",
"value": "@daily"
},
"siape_servidores_snapshot": {
"type": "timedelta",
"value": { "hours": 12 }
}
}
Tipos suportados: preset (aliases do Airflow como @daily), cron (expressão cron), timedelta (objeto com campos aceitos pelo timedelta do Python).
Configurações padrão¶
default_args = {
"owner": "nome-do-responsavel", # nome da pessoa ou time responsável
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
Nota sobre owner: use o nome de quem é responsável pela manutenção daquela DAG. Não deixar em branco.
Tags¶
Toda DAG deve declarar ao menos as tags de origem e domínio. Tags de camada (bronze, silver, gold) são recomendadas quando aplicável.
tags=["siafi", "nota_empenho"] # mínimo
tags=["siafi", "nota_empenho", "bronze"] # recomendado para DAGs de ingestão
Params para backfill¶
DAGs que suportam reprocessamento histórico devem declarar params com Param, documentando tipo e descrição:
from airflow.models.param import Param
params={
"ano_inicio": Param(
default=None,
type=["integer", "null"],
title="Ano de Início",
description="Backfill: ano inicial para reprocessamento. (type=int)",
),
"ano_fim": Param(
default=None,
type=["integer", "null"],
title="Ano de Fim",
description="Backfill: ano final para reprocessamento. (type=int)",
),
},
Dentro da task, acessar via context["params"] e sempre definir fallback para o ano corrente:
params = context["params"]
ano_inicio = params.get("ano_inicio") or datetime.now().year
ano_fim = params.get("ano_fim") or datetime.now().year
Airflow Variables¶
O projeto usa três Airflow Variables principais:
| Variable | Tipo | Descrição |
|---|---|---|
dynamic_schedules |
JSON | Schedules por dag_id. Lida por get_dynamic_schedule(). |
airflow_orgao |
string | Código do órgão alvo para ingestão. |
airflow_variables |
YAML | Configurações por órgão (ex: codigos_ug). |
Sempre validar se as variáveis críticas foram encontradas antes de prosseguir:
from airflow.models import Variable
import yaml
orgao_alvo = Variable.get("airflow_orgao", default_var=None)
if not orgao_alvo:
logging.error("Variável airflow_orgao não definida!")
raise ValueError("airflow_orgao não definida")
orgaos_config = yaml.safe_load(Variable.get("airflow_variables", default_var="{}"))
ugs_emitentes = orgaos_config.get(orgao_alvo, {}).get("codigos_ug", [])
Conexão com PostgreSQL¶
Usar sempre get_postgres_conn() de helpers/postgres_helpers.py, que resolve a conexão via PostgresHook do Airflow. Não construir strings de conexão manualmente.
from postgres_helpers import get_postgres_conn
from cliente_postgres import ClientPostgresDB
postgres_conn_str = get_postgres_conn() # usa "postgres_default"
postgres_conn_str = get_postgres_conn("outro_conn_id") # conn_id customizado
db = ClientPostgresDB(postgres_conn_str)
As credenciais são gerenciadas pelas Airflow Connections, não por variáveis de ambiente diretamente no código.
Uso de plugins e helpers¶
Os clientes em plugins/ e os helpers em helpers/ devem ser sempre reaproveitados. Não reimplementar lógica de conexão ou requisição dentro da DAG.
| Necessidade | O que usar |
|---|---|
| Conexão com PostgreSQL | get_postgres_conn() + ClientPostgresDB |
| Requisições com retry | retry_helpers.py |
| APIs externas | cliente_<origem>.py correspondente |
Ordem recomendada no arquivo¶
# 1. imports da stdlib
# 2. imports do Airflow
# 3. imports do projeto (schedule_loader, clientes, helpers)
# 4. definição da DAG com @dag
# 4a. default_args
# 4b. params (se houver)
# 4c. tasks com @task
# 4d. orquestração das tasks
# 5. instanciação: dag_instance = nome_da_dag()
Checklist de PR — DAGs de ingestão¶
- Nome segue o formato
<origem>_<dominio>_<acao> -
dag_id(nome da função) é idêntico ao nome do arquivo - Usa TaskFlow API (
@dag,@task) - Schedule via
get_dynamic_schedule() -
default_argsincluiowner,retrieseretry_delay - Tags declaram ao menos origem e domínio
-
catchup=Falsedeclarado -
start_dateé uma data fixa - Airflow Variables validadas antes do uso
- Conexão PostgreSQL via
get_postgres_conn() - Plugins e helpers existentes reaproveitados
- DAG está no diretório correto dentro de
data_ingest/<origem>/
DAGs dbt com Cosmos¶
Os projetos dbt (ipea, mir) são orquestrados pelo Airflow via Astronomer Cosmos, que converte os modelos dbt em tasks do Airflow automaticamente. O padrão é idêntico entre os dois projetos.
Estrutura padrão¶
import os
from datetime import datetime
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.constants import DBT_LOG_PATH_ENVVAR
# Configurar diretório de logs do dbt
dbt_log_path = "/tmp/dbt_logs" # NOSONAR
os.makedirs(dbt_log_path, exist_ok=True)
os.environ[DBT_LOG_PATH_ENVVAR] = dbt_log_path
profile_config = ProfileConfig(
profiles_yml_filepath=f"{os.environ['AIRFLOW_REPO_BASE']}/dags/dbt/<projeto>/profiles.yml",
profile_name="<projeto>",
target_name="prod",
)
my_cosmos_dag = DbtDag(
project_config=ProjectConfig(f"{os.environ['AIRFLOW_REPO_BASE']}/dags/dbt/<projeto>"),
profile_config=profile_config,
execution_config=ExecutionConfig(
dbt_executable_path=f"{os.environ['AIRFLOW_REPO_BASE']}/.local/bin/dbt",
),
schedule_interval="0 1 * * *", # diariamente à 01:00
start_date=datetime(2025, 1, 1),
catchup=False,
dag_id="<projeto>_cosmos_dag",
default_args={"retries": 2},
)
Pontos de atenção¶
- Caminhos resolvidos via
AIRFLOW_REPO_BASE— não usar caminhos absolutos hardcodados. target_name="prod"aponta para o perfil de produção noprofiles.yml. Para rodar localmente, alterar para o target local antes de testar.- DAGs Cosmos não usam
get_dynamic_schedule()— o schedule é cron direto, pois o Cosmos gerencia o ciclo de vida da DAG de forma diferente. retries=2é o padrão para DAGs dbt (diferente doretries=1das DAGs de ingestão).- O comentário
# NOSONARnodbt_log_pathé intencional — suprime alerta de análise estática para aquela linha. Manter ao copiar o template.
Padrões SQL e dbt¶
Estrutura de camadas¶
O projeto adota a arquitetura medallion. Cada domínio tem sua própria pasta com as camadas que forem necessárias:
models/
<dominio>_dbt/
bronze/ # dados brutos, sem transformação de negócio
silver/ # dados tratados, tipados e enriquecidos
gold/ # agregações e modelos analíticos finais
views/ # views auxiliares, quando necessário
Não criar modelos fora dessa estrutura.
Convenção de nomenclatura¶
Prefixos por camada:
| Camada | Prefixo | Exemplo |
|---|---|---|
| Bronze | brz_ |
brz_contratos |
| Silver | slv_ |
slv_contratos_empenhos |
| Gold | gld_ |
gld_contratos_resumo |
O nome do arquivo .sql deve ser idêntico ao nome do modelo (sem extensão).
Regras gerais:
- Sempre usar
snake_casepara nomes de modelos, colunas e aliases. - Nomes de modelos devem ser descritivos e únicos dentro do projeto.
- Evitar abreviações que não sejam amplamente conhecidas no domínio (
ncpara nota de crédito é aceitável; abreviações inventadas não são).
Padrões SQL¶
Keywords em lowercase:
O projeto usa keywords em lowercase. Manter consistência com os modelos existentes.
-- correto (padrão do projeto)
select
id_contrato,
valor_total
from {{ ref('brz_contratos') }}
where situacao = 'ATIVO'
Sem SELECT * em modelos finais:
select * é aceitável dentro de CTEs intermediárias, mas o select final deve listar as colunas explicitamente.
CTEs nomeadas semanticamente:
Usar CTEs para organizar a lógica em etapas legíveis. Nomear cada CTE de acordo com o que ela representa.
with
servidores_ativos as (
select *
from {{ ref('brz_lista_servidores') }}
where situacao_vinculo = 'ATIVO_PERMANENTE'
),
dados_enriquecidos as (
select
s.matricula,
s.nome,
f.cargo_efetivo
from servidores_ativos s
left join {{ ref('brz_dados_funcionais') }} f
on s.matricula = f.matricula
)
select
matricula,
nome,
cargo_efetivo
from dados_enriquecidos
Joins explícitos:
Sempre declarar o tipo do join. Nunca usar joins implícitos via vírgula no from.
-- correto
from tabela_a a
left join tabela_b b on a.id = b.id_ref
-- incorreto
from tabela_a a, tabela_b b where a.id = b.id_ref
Indentação:
- 4 espaços por nível (não usar tabs).
- Cada coluna em sua própria linha.
- Vírgulas no início da linha nos
selectfinais.
Campo dt_ingest¶
Todo modelo deve propagar o campo dt_ingest, representando a data/hora mais recente de ingestão das tabelas fonte utilizadas. Em modelos que combinam múltiplas fontes, usar greatest():
A descrição padrão do campo no schema.yml:
- name: dt_ingest
description: >
Data e hora (UTC-3 Brasília) mais recente de ingestão dos dados
das tabelas fonte utilizadas neste modelo.
Uso de macros¶
O projeto possui macros reutilizáveis em macros/. Consultar antes de implementar lógica customizada:
| Macro | Uso |
|---|---|
safe_casts.sql |
Casts seguros com tratamento de nulos |
parse_financial_value.sql |
Normalização de valores financeiros |
data_quality/row_count_match.sql |
Validação de contagem entre camadas |
data_quality/verificacao_tipagem.sql |
Verificação de tipos de colunas |
metadata/generate_metadata.sql |
Geração de metadados de modelos |
udfs/f_parse_dates.sql |
Parse de datas em diferentes formatos |
udfs/f_format_nc.sql |
Formatação de notas de crédito |
Schema.yml — documentação e testes¶
Todo modelo deve ter entrada no schema.yml da sua camada com description preenchida. Para silver e gold, documentar também as colunas principais.
Estrutura padrão:
version: 2
models:
- name: slv_servidores_completos
description: >
Descrição clara do que o modelo representa, de onde vêm os dados
e qual o propósito analítico.
meta:
tags:
- silver
columns:
- name: cpf
description: CPF do servidor, utilizado como chave de junção entre as bases.
- name: dt_ingest
description: >
Data e hora (UTC-3 Brasília) mais recente de ingestão dos dados
das tabelas fonte utilizadas neste modelo.
Nota sobre tags: as tags de camada (
bronze,silver,gold) são declaradas noschema.ymldentro do blocometa.tags, não na DAG. Manter consistência com os modelos existentes.
Testes mínimos por camada:
| Teste | Bronze | Silver | Gold |
|---|---|---|---|
not_null em PKs |
obrigatório | obrigatório | obrigatório |
unique em PKs |
recomendado | obrigatório | obrigatório |
relationships em FKs |
— | obrigatório | recomendado |
accepted_values em enums |
— | recomendado | obrigatório |
Seeds e snapshots¶
- Seeds (
seeds/) são arquivos CSV de referência estática (ex:estados_brasil.csv,partidos_map.csv). Não usar seeds para dados que mudam com frequência. - Snapshots (
snapshots/) capturam o histórico de mudanças de tabelas mutáveis. Documentar a estratégia de snapshot (timestamp ou check) no próprio arquivoyml.
Checklist de PR — modelos dbt¶
- Nome segue o prefixo da camada (
brz_,slv_,gld_) - Modelo está no diretório correto dentro de
<dominio>_dbt/<camada>/ -
schema.ymlexiste comdescriptionemeta.tagspreenchidos - Colunas principais documentadas no
schema.yml -
dt_ingestpropagado e com descrição padrão -
selectfinal lista colunas explicitamente - Keywords em lowercase e indentação com 4 espaços
- CTEs nomeadas semanticamente
- Joins explícitos
- Macros existentes aproveitadas onde aplicável
-
dbt runedbt testpassam localmente antes do PR