Guia de Integração de Nova Base – GovHub¶
1) Visão geral (como o projeto é organizado)¶
O GovHub é estruturado em módulos bem definidos dentro do repositório, cada um com uma responsabilidade clara. Isso facilita para qualquer pessoa nova entender onde olhar e como começar ao integrar uma nova base de dados.
1.1. Orquestração – DAGs no Airflow¶
- Local:
airflow_lappis/dags/** - Aqui ficam os arquivos das DAGs, que são os fluxos de execução no Airflow.
-
Cada DAG define:
-
Quando rodar (agendamento com
schedule_interval). - Quais tarefas executar (normalmente funções decoradas com
@task). - Dependência entre tarefas (ordem de execução).
- Exemplo: uma DAG pode ter uma tarefa para buscar dados de uma API, outra para salvar no banco, e outra para disparar o processamento analítico.
1.2. Clientes de APIs – Plugins¶
- Local:
airflow_lappis/plugins/** - Cada sistema externo (PNCP, ComprasNet, SIAPE, etc.) tem seu próprio cliente, nomeado como
cliente_<sistema>.py. - A ideia é encapsular toda a lógica de comunicação com o provedor em uma classe separada, deixando a DAG mais limpa.
-
Estrutura comum:
-
Herança de uma classe base (
cliente_base.py) que já implementa comportamentos padrão: timeout, retries, logging básico. - Métodos específicos para endpoints da API (ex.:
get_contratacoes_publicacaono cliente do PNCP).
1.3. Helpers reutilizáveis¶
- Local:
airflow_lappis/helpers/** - São utilitários genéricos que podem ser usados por qualquer cliente ou DAG.
-
Exemplos:
-
safe_request.py: função para chamadas HTTP mais seguras (tratando erros, retornos vazios ou conteúdo não-JSON). - Funções de parsing, formatação de datas, manipulação de parâmetros.
- O objetivo é evitar duplicação de código: se você precisa de retry customizado ou de algo que já existe, basta importar o helper.
1.4. Persistência – Banco de dados¶
-
Arquivos principais:
-
cliente_postgres.py: classe cliente para inserir dados no Postgres. postgres_helpers.py: utilitários como a string de conexão (get_postgres_conn) e funções auxiliares para criar schema/tabelas.-
Normalmente, quando se integra uma nova base, você:
-
Usa o cliente da API para buscar os dados.
- Usa o cliente do Postgres para criar um novo schema (um namespace no banco).
- Insere os dados em uma tabela correspondente (com suporte a upsert, chaves primárias, etc.).
1.5. Modelagem analítica (opcional)¶
- Local:
airflow_lappis/dags/dbt/** - Após a ingestão bruta no Postgres, pode-se criar modelos analíticos usando dbt.
- Isso permite aplicar transformações em camadas (bronze, silver, gold) e preparar os dados para dashboards, relatórios e análises mais avançadas.
- É opcional: se a integração só precisa carregar dados brutos, não há necessidade de mexer no dbt.
1.6. Logs – Monitoramento¶
- O projeto usa
loggingem todas as classes e DAGs. - Cada etapa da execução gera logs detalhados (tentativas de requisição, parâmetros usados, quantidade de registros coletados/inseridos).
- Isso garante rastreabilidade: você consegue saber exatamente onde falhou (na API, no banco, ou no parser) sem precisar adivinhar.
Estrutura essencial (resumo do repositório)¶
airflow_lappis/
├── dags/
│ ├── data_ingest/... # Suas DAGs de ingestão
│ └── dbt/ # Modelagem analítica (dbt models)
├── helpers/
│ ├── postgres_helpers.py # Conexão e utilitários para Postgres
│ └── safe_request.py # Funções seguras para chamadas HTTP
└── plugins/
├── cliente_base.py # Classe base para APIs REST (httpx)
├── cliente_postgres.py # Cliente para persistir dados no Postgres
└── cliente_<nova_base>.py # Cliente que você cria ao integrar nova base
2) Checklist de integração (ordem recomendada)¶
(a) Estudo inicial da API do provedor
¶
Antes de escrever qualquer código, é fundamental entender bem a API que será integrada. Isso envolve:
- Base URL: qual é a raiz dos endpoints (ex.:
https://contratos.comprasnet.gov.br/api). - Autenticação: é aberta ou precisa de token/certificado/usuário e senha?
- Formato de resposta: retorna sempre JSON? Pode devolver XML, CSV ou até HTML em caso de erro?
- Paginação: a API retorna todos os dados de uma vez ou exige navegar página por página (parâmetros como
pagina=1,2,...)? - Headers obrigatórios: geralmente
accept: application/json, mas pode haver exigências de chave de API, user-agent, etc. - Limites de uso (rate limits): existe limite de chamadas por minuto/hora? Se sim, precisamos considerar
sleepou retentativas.
Essa etapa é essencial porque define a forma como vamos estruturar o cliente e o fluxo da DAG.
A dica prática é: testar manualmente a API com curl, Postman ou até requests em Python antes de começar a codar.
(b) Criação do cliente da base (plugins/cliente_<nova_base>.py)
¶
O cliente é a camada que encapsula todas as chamadas à API. Ele herda de ClienteBase (classe comum que já configura httpx, timeout e retries) e define métodos específicos para cada endpoint.
Estrutura típica¶
-
Definições principais:
BASE_URL: raiz da API.BASE_HEADER: headers padrão.
-
Construtor (
__init__):- Chama o
super().__init__passando aBASE_URL. - Registra logs para indicar a inicialização do cliente.
- Chama o
-
Métodos por endpoint:
- Nomeados com clareza, ex.:
get_contratos_by_ug,get_faturas_by_contrato_id. - Fazem a chamada com
self.request(já herdado doClienteBase). - Validam o status da resposta.
- Retornam a lista ou
None.
- Nomeados com clareza, ex.:
Exemplo (trecho simplificado do cliente_contratos)¶
class ClienteContratos(ClienteBase):
BASE_URL = "https://contratos.comprasnet.gov.br/api"
BASE_HEADER = {"accept": "application/json"}
def __init__(self) -> None:
super().__init__(base_url=ClienteContratos.BASE_URL)
logging.info(f"[cliente_contratos.py] Initialized with base_url {self.BASE_URL}")
def get_contratos_by_ug(self, ug_code: str) -> list | None:
endpoint = f"/contrato/ug/{ug_code}"
status, data = self.request(http.HTTPMethod.GET, endpoint, headers=self.BASE_HEADER)
if status == http.HTTPStatus.OK and isinstance(data, list):
return data
return None
A ideia é que qualquer pessoa saiba olhar o cliente e entender:
- como a API é chamada,
- como o retorno é tratado,
- e onde estão os pontos de log.
© Criação da DAG no Airflow (dags/<nova_base>_dag.py)
¶
A DAG orquestra a ingestão dos dados. Ela é responsável por:
- Agendamento: definido no decorator
@dag(ex.:@daily,@weekly). - Variáveis de configuração: lidas do Airflow (
Variable.get) para parametrizar a execução. -
Tasks: funções anotadas com
@taskque:- Instanciam o cliente da API.
- Instanciam o cliente Postgres (
ClientPostgresDB). - Buscam os dados via cliente da API.
- Inserem os dados no banco, adicionando campos como
dt_ingest.
Exemplo simplificado¶
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["contratos_api"],
)
def api_contratos_dag():
@task
def fetch_and_store():
api = ClienteContratos()
db = ClientPostgresDB(get_postgres_conn())
contratos = api.get_contratos_by_ug("113601")
if contratos:
for c in contratos:
c["dt_ingest"] = datetime.now().isoformat()
db.insert_data(contratos, "contratos", schema="compras_gov", conflict_fields=["id"], primary_key=["id"])
fetch_and_store()
(d) Modelagem analítica com dbt
¶
Depois de coletar e armazenar os dados no Postgres, usamos o dbt para organizar as camadas:
- Bronze: tabelas mais próximas da fonte, com padronização mínima (tipagem, normalização de campos nulos, etc.).
- Silver: tabelas transformadas e enriquecidas, com junções entre diferentes fontes (ex.: contratos + empenhos).
- Gold: dados prontos para consumo em dashboards e relatórios. Aqui já está no formato esperado pelo BI (Superset, Power BI, etc.).
A organização do dbt no projeto segue a lógica de pastas por schema (models/<schema>), e dentro delas as subpastas bronze, silver e gold.
(e) Logging e variáveis do Airflow
¶
Logging¶
O logging é essencial para a rastreabilidade. Quando rodamos uma DAG no Airflow, todos os logs das tasks ficam registrados na interface. Isso permite identificar:
- Parâmetros usados na execução.
- Quantidade de registros processados.
- Erros e exceções.
Por isso, é boa prática logar:
- Início e fim de cada chamada.
- Totais de registros.
- Avisos quando o retorno estiver vazio.
Variáveis do Airflow¶
É recomendável usar variáveis do Airflow para guardar:
- Códigos de órgãos.
- CNPJs.
- Tokens de acesso (quando não forem credenciais sensíveis).
Assim conseguimos alterar o comportamento da DAG sem precisar mexer no código, apenas ajustando variáveis no painel do Airflow.
3) Reaproveitando ao máximo¶
Ao integrar uma nova base no GovHub, não é necessário reinventar a roda. Já existem clientes e helpers que podem (e devem) ser reutilizados. Isso garante padronização, menos bugs e menos código duplicado.
Clientes e helpers¶
-
cliente_base.pyEsse arquivo é a fundação para qualquer cliente de API. Ele já implementa:- Conexão com
httpx. - Timeout padrão (10 segundos).
- Retentativas automáticas (até 3, com backoff).
- Conexão com
Quando usar?
Se a sua API sempre retorna JSON válido e segue um padrão consistente, você pode simplesmente herdar de ClienteBase e chamar o método self.request. Isso já resolve 90% dos casos.
-
helpers/safe_request.pyEsse helper foi criado para APIs menos previsíveis. Ele trata situações como:- A API responde com 204 (sem conteúdo).
- O Content-Type não é
application/json(às vezes vem HTML ou CSV). - O corpo da resposta está vazio ou contém JSON inválido.
Quando usar?
Se a API não é confiável e pode quebrar o fluxo ao tentar json().
Nesse caso, o request_safe não lança exceções: em vez disso, retorna um str ou None, permitindo que sua DAG continue rodando sem travar.
Postgres¶
Outra parte que já está pronta para você reaproveitar é a persistência dos dados.
-
cliente_postgres.py+postgres_helpers.pyJuntos, eles facilitam toda a interação com o banco:- Conexão: use
get_postgres_conn()para obter a string de conexão. -
Inserção: use
insert_data()para inserir registros em tabelas, com suporte a:- Upsert (evita duplicação de dados).
- Definição de chave primária (
primary_key). - Conflitos (
conflict_fields).
- Conexão: use
Criação de schemas e tabelas (dinâmica com o ClientPostgresDB)¶
Quando usamos o método insert_data() do ClientPostgresDB, não precisamos nos preocupar em criar tabelas manualmente antes.
Isso acontece porque a lógica da classe já inclui duas etapas internas:
→ Garantir o schema
O método create_table_if_not_exists roda automaticamente:
Ou seja, se você passar schema="compras_gov", ele vai criar esse schema caso ele ainda não exista.
-
Criar a tabela conforme os dados
- Ele pega o primeiro item do dataset (
data[0]) como amostra. - Usa o método
_flatten_data()para achatar estruturas aninhadas (listas, dicionários dentro do JSON). - Para cada chave encontrada, cria uma coluna do tipo TEXT (ou outro mapeado, se especificado).
- Se você indicar
primary_key=["id"], ele adiciona a constraintPRIMARY KEY (id). - Por fim, dispara um
CREATE TABLE IF NOT EXISTSgarantindo que a tabela exista.
- Ele pega o primeiro item do dataset (
-
Inserção com upsert
- Ele gera um
INSERT INTO … VALUES …. - Se você passar
conflict_fields=["id"], ele constrói automaticamente um:
Isso evita duplicações: se já existir uma linha com o mesmo
id, os dados serão atualizados. - Ele gera um
➡️ Resumindo: só de chamar insert_data(), você já garante que o schema existe, que a tabela está criada e que os dados serão inseridos/atualizados corretamente.
Isso elimina a necessidade de criar tabelas manualmente na maioria dos casos.
4) Variáveis do Airflow (parametrização)¶
Variable.get("airflow_orgao"): seleciona um “contexto” (ex.:ipea).airflow_variables: YAML com configurações por órgão:
yaml.safe_load e depois faça get:
orgaos_config = yaml.safe_load(Variable.get("airflow_variables", "{}"))
cfg = orgaos_config.get(orgao_alvo, {})
cnpj = cfg.get("orgao_pncp", {}).get("cnpj")
modalidades = cfg.get("orgao_pncp", {}).get("modalidades", [])
Por que
yaml.safe_loade nãodict(...)? Porque a variável vem como string YAML. yaml.safe_loadconverte para objetos Python (dict/list/str/…).
5) Entregáveis mínimos no PR¶
-
Cliente em
plugins/cliente_<nova_base>.py- Métodos bem nomeados, docstring curta, logs.
- Uso de
request_safe(se necessário).
-
DAG em
dags/<nova_base>_dag.py- Leitura de variáveis do Airflow.
- Inserção no Postgres com
cliente_postgres. - Boa instrumentação de logs.
-
Schema/Tabelas
- SQL/DBT para criar tabelas (ou anotação clara de dependência).
-
Documentação curta (README do provedor)
- Base URL, autenticação, endpoints usados, exemplos cURL.
- Campos-chave usados como
conflict_fields.
6) Exemplo de “ciclo de vida” resumido (como no PNCP)¶
-
Cliente PNCP criado em
plugins/cliente_pncp.pyget_contratacoes_publicacao(uma página)get_contratacoes_publicacao_paginado(agregando)get_*_semestral(quebra temporal + paginação)
-
Helper
helpers/safe_request.pypara lidar com204/conteúdo não-JSON. -
DAG
dags/pncp_dag.pyusando:Variable.get+yaml.safe_loadpara lercnpjemodalidades.- Cliente PNCP para coletar dados.
ClientPostgresDB.insert_data()para persistir (comdt_ingest).
7) Dicas finais¶
- Não duplique lógica que já existe em helpers (retry, request seguro, inserção).
- Mantenha o cliente pequeno, legível e com logs úteis (params, status, contagens).
- Ao lidar com credenciais, use variáveis do Airflow, nunca hardcode.
-
Se precisar encadear DAGs (ex.: “ingestão” → “dbt”), use:
TriggerDagRunOperator, ouExternalTaskSensor, ou- Datasets (recomendado para baixo acoplamento).