🔄 Pipeline Flow

CAPYGATOR BI folgt einem klassischen ELT-Pattern: Extract mit dlt, Load in DuckDB, Transform mit SQLMesh. Dagster orchestriert den gesamten Prozess und stellt sicher, dass Abhaengigkeiten eingehalten werden.

┌──────────────────────────────────────────────────────────────────────────────────────────┐ │ CAPYGATOR BI Pipeline │ └──────────────────────────────────────────────────────────────────────────────────────────┘ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Sources │ │ dlt │ │ DuckDB │ │ SQLMesh │ │─────────────│ │─────────────│ │─────────────│ │─────────────│ │ Buchungen │────────►│ Extract │────────►│ Raw Data │────────►│ Staging │ │ .xlsx │ │ Validate │ │ finanz_data │ │ stg_* │ │ │ │ Deduplicate │ │ .buchungen │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘ │ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Streamlit │◄────────│ DuckDB │◄────────│ SQLMesh │◄────────│ Marts │ │─────────────│ │─────────────│ │─────────────│ │─────────────│ │ Dashboard │ │ Analytics │ │ Transform │ │ dim_* │ │ Visualize │ │ Query │ │ Aggregate │ │ fact_* │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ ┌─────────────┐ │ Dagster │ │─────────────│ │ Orchestrate │ │ Schedule │ │ Monitor │ └─────────────┘

📥 dlt - Data Ingestion

dlt (data load tool) kuemmert sich um den Datenimport. Die Pipeline liest Excel-Dateien, validiert Daten und schreibt sie mit einer Merge-Strategie nach DuckDB. Duplikate werden anhand eines Hashes erkannt.

sources/finanz_pipeline.py
@dlt.resource(
    name="buchungen",
    write_disposition="merge",
    primary_key="buchung_hash"
)
def buchungen_resource():
    """
    Merge-Strategie:
    - Neue Buchungen werden hinzugefuegt
    - Existierende werden aktualisiert
    - Keine Duplikate durch primary_key
    """
    yield from load_buchungen()

Qualitaetspruefung

Validierung von Pflichtfeldern, Betrag und Buchungstag. Fehlerhafte Zeilen werden protokolliert.

data/quality_report.json

Duplikat-Erkennung

MD5-Hash aus allen Buchungsfeldern. Identische Eintraege werden automatisch erkannt.

buchung_hash Column

🔧 SQLMesh - Transformations

SQLMesh transformiert die Rohdaten in ein analytisches Star Schema. Zwei Layer: Staging (Bereinigung) und Marts (Dimensionen + Fakten).

staging stg_buchungen Bereinigte Buchungen mit Typkonvertierung
marts dim_kategorie Haupt- und Unterkategorien
marts dim_konto Referenzkonten mit Namen
marts dim_empfaenger Empfaenger mit IBAN
marts dim_finanz_datum Zeitdimension (Tag bis Jahr)
marts fact_buchungen Faktentabelle mit allen Keys

⚙️ Dagster - Orchestration

Dagster orchestriert die gesamte Pipeline als Asset-Graph. dlt-Assets werden zu Dagster-Assets gewrapped, SQLMesh-Modelle werden automatisch als abhaengige Assets erkannt.

┌─────────────────────────────────────────┐ │ Dagster Asset Graph │ └─────────────────────────────────────────┘ ┌───────────────────┐ │ finanz_assets │ │ (dlt Source) │ └─────────┬─────────┘ │ ▼ ┌───────────────────┐ │ stg_buchungen │ │ (SQLMesh) │ └─────────┬─────────┘ │ ├────────────────┬────────────────┬────────────────┐ ▼ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ dim_kategorie │ │ dim_konto │ │ dim_empfaenger │ │dim_finanz_datum │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ │ └───────────────────┴───────────────────┴───────────────────┘ │ ▼ ┌─────────────────┐ │ fact_buchungen │ │ (Star Schema) │ └─────────────────┘
src/capygator_bi/definitions.py
from dagster import Definitions
from dagster_dlt import DagsterDltResource
from dagster_sqlmesh import SQLMeshResource

defs = Definitions(
    assets=all_assets,
    resources={
        "dlt": DagsterDltResource(),
        "sqlmesh": SQLMeshResource(config=sqlmesh_config),
    },
)

📁 Project Structure

Verzeichnisstruktur
CAPYGATOR_BI/
├── app/                      # Streamlit Apps
│   ├── main_app.py           # Unified Dashboard
│   ├── finanz_app.py
│   ├── journal_app.py
│   └── ernaehrung_app.py
├── contracts/                # Data Contracts (ODCS)
│   ├── finanz-buchungen.yaml
│   ├── journal-eintraege.yaml
│   └── ernaehrung-protokoll.yaml
├── data/                     # DuckDB Databases
│   └── finanz.duckdb
├── models/                   # SQLMesh Models
│   ├── staging/
│   │   └── stg_buchungen.sql
│   └── marts/
│       ├── dim_*.sql
│       └── fact_buchungen.sql
├── sources/                  # dlt Pipelines
│   └── finanz_pipeline.py
├── src/capygator_bi/         # Dagster Package
│   ├── assets/
│   │   ├── dlt_assets.py
│   │   └── sqlmesh_assets.py
│   └── definitions.py
├── config.yaml               # SQLMesh Config
└── pyproject.toml            # Project Config