Guia de Integração de Nova Base – GovHub¶
1) Visão geral (como o projeto é organizado)¶
O GovHub é estruturado em módulos bem definidos dentro do repositório, cada um com uma responsabilidade clara. Isso facilita para qualquer pessoa nova entender onde olhar e como começar ao integrar uma nova base de dados.
1.1. Orquestração – DAGs no Airflow¶
- Local:
airflow_lappis/dags/**
- Aqui ficam os arquivos das DAGs, que são os fluxos de execução no Airflow.
-
Cada DAG define:
-
Quando rodar (agendamento com
schedule_interval
). - Quais tarefas executar (normalmente funções decoradas com
@task
). - Dependência entre tarefas (ordem de execução).
- Exemplo: uma DAG pode ter uma tarefa para buscar dados de uma API, outra para salvar no banco, e outra para disparar o processamento analítico.
1.2. Clientes de APIs – Plugins¶
- Local:
airflow_lappis/plugins/**
- Cada sistema externo (PNCP, ComprasNet, SIAPE, etc.) tem seu próprio cliente, nomeado como
cliente_<sistema>.py
. - A ideia é encapsular toda a lógica de comunicação com o provedor em uma classe separada, deixando a DAG mais limpa.
-
Estrutura comum:
-
Herança de uma classe base (
cliente_base.py
) que já implementa comportamentos padrão: timeout, retries, logging básico. - Métodos específicos para endpoints da API (ex.:
get_contratacoes_publicacao
no cliente do PNCP).
1.3. Helpers reutilizáveis¶
- Local:
airflow_lappis/helpers/**
- São utilitários genéricos que podem ser usados por qualquer cliente ou DAG.
-
Exemplos:
-
safe_request.py
: função para chamadas HTTP mais seguras (tratando erros, retornos vazios ou conteúdo não-JSON). - Funções de parsing, formatação de datas, manipulação de parâmetros.
- O objetivo é evitar duplicação de código: se você precisa de retry customizado ou de algo que já existe, basta importar o helper.
1.4. Persistência – Banco de dados¶
-
Arquivos principais:
-
cliente_postgres.py
: classe cliente para inserir dados no Postgres. postgres_helpers.py
: utilitários como a string de conexão (get_postgres_conn
) e funções auxiliares para criar schema/tabelas.-
Normalmente, quando se integra uma nova base, você:
-
Usa o cliente da API para buscar os dados.
- Usa o cliente do Postgres para criar um novo schema (um namespace no banco).
- Insere os dados em uma tabela correspondente (com suporte a upsert, chaves primárias, etc.).
1.5. Modelagem analítica (opcional)¶
- Local:
airflow_lappis/dags/dbt/**
- Após a ingestão bruta no Postgres, pode-se criar modelos analíticos usando dbt.
- Isso permite aplicar transformações em camadas (bronze, silver, gold) e preparar os dados para dashboards, relatórios e análises mais avançadas.
- É opcional: se a integração só precisa carregar dados brutos, não há necessidade de mexer no dbt.
1.6. Logs – Monitoramento¶
- O projeto usa
logging
em todas as classes e DAGs. - Cada etapa da execução gera logs detalhados (tentativas de requisição, parâmetros usados, quantidade de registros coletados/inseridos).
- Isso garante rastreabilidade: você consegue saber exatamente onde falhou (na API, no banco, ou no parser) sem precisar adivinhar.
Estrutura essencial (resumo do repositório)¶
airflow_lappis/
├── dags/
│ ├── data_ingest/... # Suas DAGs de ingestão
│ └── dbt/ # Modelagem analítica (dbt models)
├── helpers/
│ ├── postgres_helpers.py # Conexão e utilitários para Postgres
│ └── safe_request.py # Funções seguras para chamadas HTTP
└── plugins/
├── cliente_base.py # Classe base para APIs REST (httpx)
├── cliente_postgres.py # Cliente para persistir dados no Postgres
└── cliente_<nova_base>.py # Cliente que você cria ao integrar nova base
2) Checklist de integração (ordem recomendada)¶
(a) Estudo inicial da API do provedor
¶
Antes de escrever qualquer código, é fundamental entender bem a API que será integrada. Isso envolve:
- Base URL: qual é a raiz dos endpoints (ex.:
https://contratos.comprasnet.gov.br/api
). - Autenticação: é aberta ou precisa de token/certificado/usuário e senha?
- Formato de resposta: retorna sempre JSON? Pode devolver XML, CSV ou até HTML em caso de erro?
- Paginação: a API retorna todos os dados de uma vez ou exige navegar página por página (parâmetros como
pagina=1,2,...
)? - Headers obrigatórios: geralmente
accept: application/json
, mas pode haver exigências de chave de API, user-agent, etc. - Limites de uso (rate limits): existe limite de chamadas por minuto/hora? Se sim, precisamos considerar
sleep
ou retentativas.
Essa etapa é essencial porque define a forma como vamos estruturar o cliente e o fluxo da DAG.
A dica prática é: testar manualmente a API com curl
, Postman ou até requests
em Python antes de começar a codar.
(b) Criação do cliente da base (plugins/cliente_<nova_base>.py
)
¶
O cliente é a camada que encapsula todas as chamadas à API. Ele herda de ClienteBase
(classe comum que já configura httpx
, timeout e retries) e define métodos específicos para cada endpoint.
Estrutura típica¶
-
Definições principais:
BASE_URL
: raiz da API.BASE_HEADER
: headers padrão.
-
Construtor (
__init__
):- Chama o
super().__init__
passando aBASE_URL
. - Registra logs para indicar a inicialização do cliente.
- Chama o
-
Métodos por endpoint:
- Nomeados com clareza, ex.:
get_contratos_by_ug
,get_faturas_by_contrato_id
. - Fazem a chamada com
self.request
(já herdado doClienteBase
). - Validam o status da resposta.
- Retornam a lista ou
None
.
- Nomeados com clareza, ex.:
Exemplo (trecho simplificado do cliente_contratos
)¶
class ClienteContratos(ClienteBase):
BASE_URL = "https://contratos.comprasnet.gov.br/api"
BASE_HEADER = {"accept": "application/json"}
def __init__(self) -> None:
super().__init__(base_url=ClienteContratos.BASE_URL)
logging.info(f"[cliente_contratos.py] Initialized with base_url {self.BASE_URL}")
def get_contratos_by_ug(self, ug_code: str) -> list | None:
endpoint = f"/contrato/ug/{ug_code}"
status, data = self.request(http.HTTPMethod.GET, endpoint, headers=self.BASE_HEADER)
if status == http.HTTPStatus.OK and isinstance(data, list):
return data
return None
A ideia é que qualquer pessoa saiba olhar o cliente e entender:
- como a API é chamada,
- como o retorno é tratado,
- e onde estão os pontos de log.
© Criação da DAG no Airflow (dags/<nova_base>_dag.py
)
¶
A DAG orquestra a ingestão dos dados. Ela é responsável por:
- Agendamento: definido no decorator
@dag
(ex.:@daily
,@weekly
). - Variáveis de configuração: lidas do Airflow (
Variable.get
) para parametrizar a execução. -
Tasks: funções anotadas com
@task
que:- Instanciam o cliente da API.
- Instanciam o cliente Postgres (
ClientPostgresDB
). - Buscam os dados via cliente da API.
- Inserem os dados no banco, adicionando campos como
dt_ingest
.
Exemplo simplificado¶
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["contratos_api"],
)
def api_contratos_dag():
@task
def fetch_and_store():
api = ClienteContratos()
db = ClientPostgresDB(get_postgres_conn())
contratos = api.get_contratos_by_ug("113601")
if contratos:
for c in contratos:
c["dt_ingest"] = datetime.now().isoformat()
db.insert_data(contratos, "contratos", schema="compras_gov", conflict_fields=["id"], primary_key=["id"])
fetch_and_store()
(d) Modelagem analítica com dbt
¶
Depois de coletar e armazenar os dados no Postgres, usamos o dbt para organizar as camadas:
- Bronze: tabelas mais próximas da fonte, com padronização mínima (tipagem, normalização de campos nulos, etc.).
- Silver: tabelas transformadas e enriquecidas, com junções entre diferentes fontes (ex.: contratos + empenhos).
- Gold: dados prontos para consumo em dashboards e relatórios. Aqui já está no formato esperado pelo BI (Superset, Power BI, etc.).
A organização do dbt no projeto segue a lógica de pastas por schema (models/<schema>
), e dentro delas as subpastas bronze, silver e gold.
(e) Logging e variáveis do Airflow
¶
Logging¶
O logging é essencial para a rastreabilidade. Quando rodamos uma DAG no Airflow, todos os logs das tasks ficam registrados na interface. Isso permite identificar:
- Parâmetros usados na execução.
- Quantidade de registros processados.
- Erros e exceções.
Por isso, é boa prática logar:
- Início e fim de cada chamada.
- Totais de registros.
- Avisos quando o retorno estiver vazio.
Variáveis do Airflow¶
É recomendável usar variáveis do Airflow para guardar:
- Códigos de órgãos.
- CNPJs.
- Tokens de acesso (quando não forem credenciais sensíveis).
Assim conseguimos alterar o comportamento da DAG sem precisar mexer no código, apenas ajustando variáveis no painel do Airflow.
3) Reaproveitando ao máximo¶
Ao integrar uma nova base no GovHub, não é necessário reinventar a roda. Já existem clientes e helpers que podem (e devem) ser reutilizados. Isso garante padronização, menos bugs e menos código duplicado.
Clientes e helpers¶
-
cliente_base.py
Esse arquivo é a fundação para qualquer cliente de API. Ele já implementa:- Conexão com
httpx
. - Timeout padrão (10 segundos).
- Retentativas automáticas (até 3, com backoff).
- Conexão com
Quando usar?
Se a sua API sempre retorna JSON válido e segue um padrão consistente, você pode simplesmente herdar de ClienteBase
e chamar o método self.request
. Isso já resolve 90% dos casos.
-
helpers/safe_request.py
Esse helper foi criado para APIs menos previsíveis. Ele trata situações como:- A API responde com 204 (sem conteúdo).
- O Content-Type não é
application/json
(às vezes vem HTML ou CSV). - O corpo da resposta está vazio ou contém JSON inválido.
Quando usar?
Se a API não é confiável e pode quebrar o fluxo ao tentar json()
.
Nesse caso, o request_safe
não lança exceções: em vez disso, retorna um str
ou None
, permitindo que sua DAG continue rodando sem travar.
Postgres¶
Outra parte que já está pronta para você reaproveitar é a persistência dos dados.
-
cliente_postgres.py
+postgres_helpers.py
Juntos, eles facilitam toda a interação com o banco:- Conexão: use
get_postgres_conn()
para obter a string de conexão. -
Inserção: use
insert_data()
para inserir registros em tabelas, com suporte a:- Upsert (evita duplicação de dados).
- Definição de chave primária (
primary_key
). - Conflitos (
conflict_fields
).
- Conexão: use
Criação de schemas e tabelas (dinâmica com o ClientPostgresDB
)¶
Quando usamos o método insert_data()
do ClientPostgresDB
, não precisamos nos preocupar em criar tabelas manualmente antes.
Isso acontece porque a lógica da classe já inclui duas etapas internas:
→ Garantir o schema
O método create_table_if_not_exists
roda automaticamente:
Ou seja, se você passar schema="compras_gov"
, ele vai criar esse schema caso ele ainda não exista.
-
Criar a tabela conforme os dados
- Ele pega o primeiro item do dataset (
data[0]
) como amostra. - Usa o método
_flatten_data()
para achatar estruturas aninhadas (listas, dicionários dentro do JSON). - Para cada chave encontrada, cria uma coluna do tipo TEXT (ou outro mapeado, se especificado).
- Se você indicar
primary_key=["id"]
, ele adiciona a constraintPRIMARY KEY (id)
. - Por fim, dispara um
CREATE TABLE IF NOT EXISTS
garantindo que a tabela exista.
- Ele pega o primeiro item do dataset (
-
Inserção com upsert
- Ele gera um
INSERT INTO … VALUES …
. - Se você passar
conflict_fields=["id"]
, ele constrói automaticamente um:
Isso evita duplicações: se já existir uma linha com o mesmo
id
, os dados serão atualizados. - Ele gera um
➡️ Resumindo: só de chamar insert_data()
, você já garante que o schema existe, que a tabela está criada e que os dados serão inseridos/atualizados corretamente.
Isso elimina a necessidade de criar tabelas manualmente na maioria dos casos.
4) Variáveis do Airflow (parametrização)¶
Variable.get("airflow_orgao")
: seleciona um “contexto” (ex.:ipea
).airflow_variables
: YAML com configurações por órgão:
yaml.safe_load
e depois faça get
:
orgaos_config = yaml.safe_load(Variable.get("airflow_variables", "{}"))
cfg = orgaos_config.get(orgao_alvo, {})
cnpj = cfg.get("orgao_pncp", {}).get("cnpj")
modalidades = cfg.get("orgao_pncp", {}).get("modalidades", [])
Por que
yaml.safe_load
e nãodict(...)
? Porque a variável vem como string YAML. yaml.safe_load
converte para objetos Python (dict/list/str/…).
5) Entregáveis mínimos no PR¶
-
Cliente em
plugins/cliente_<nova_base>.py
- Métodos bem nomeados, docstring curta, logs.
- Uso de
request_safe
(se necessário).
-
DAG em
dags/<nova_base>_dag.py
- Leitura de variáveis do Airflow.
- Inserção no Postgres com
cliente_postgres
. - Boa instrumentação de logs.
-
Schema/Tabelas
- SQL/DBT para criar tabelas (ou anotação clara de dependência).
-
Documentação curta (README do provedor)
- Base URL, autenticação, endpoints usados, exemplos cURL.
- Campos-chave usados como
conflict_fields
.
6) Exemplo de “ciclo de vida” resumido (como no PNCP)¶
-
Cliente PNCP criado em
plugins/cliente_pncp.py
get_contratacoes_publicacao
(uma página)get_contratacoes_publicacao_paginado
(agregando)get_*_semestral
(quebra temporal + paginação)
-
Helper
helpers/safe_request.py
para lidar com204
/conteúdo não-JSON. -
DAG
dags/pncp_dag.py
usando:Variable.get
+yaml.safe_load
para lercnpj
emodalidades
.- Cliente PNCP para coletar dados.
ClientPostgresDB.insert_data()
para persistir (comdt_ingest
).
7) Dicas finais¶
- Não duplique lógica que já existe em helpers (retry, request seguro, inserção).
- Mantenha o cliente pequeno, legível e com logs úteis (params, status, contagens).
- Ao lidar com credenciais, use variáveis do Airflow, nunca hardcode.
-
Se precisar encadear DAGs (ex.: “ingestão” → “dbt”), use:
TriggerDagRunOperator
, ouExternalTaskSensor
, ou- Datasets (recomendado para baixo acoplamento).