CAPYGATOR BI folgt einem klassischen ELT-Pattern: Extract mit dlt, Load in DuckDB, Transform mit SQLMesh. Dagster orchestriert den gesamten Prozess und stellt sicher, dass Abhaengigkeiten eingehalten werden.
dlt (data load tool) kuemmert sich um den Datenimport. Die Pipeline liest Excel-Dateien, validiert Daten und schreibt sie mit einer Merge-Strategie nach DuckDB. Duplikate werden anhand eines Hashes erkannt.
@dlt.resource( name="buchungen", write_disposition="merge", primary_key="buchung_hash" ) def buchungen_resource(): """ Merge-Strategie: - Neue Buchungen werden hinzugefuegt - Existierende werden aktualisiert - Keine Duplikate durch primary_key """ yield from load_buchungen()
Validierung von Pflichtfeldern, Betrag und Buchungstag. Fehlerhafte Zeilen werden protokolliert.
MD5-Hash aus allen Buchungsfeldern. Identische Eintraege werden automatisch erkannt.
SQLMesh transformiert die Rohdaten in ein analytisches Star Schema. Zwei Layer: Staging (Bereinigung) und Marts (Dimensionen + Fakten).
Dagster orchestriert die gesamte Pipeline als Asset-Graph. dlt-Assets werden zu Dagster-Assets gewrapped, SQLMesh-Modelle werden automatisch als abhaengige Assets erkannt.
from dagster import Definitions from dagster_dlt import DagsterDltResource from dagster_sqlmesh import SQLMeshResource defs = Definitions( assets=all_assets, resources={ "dlt": DagsterDltResource(), "sqlmesh": SQLMeshResource(config=sqlmesh_config), }, )
CAPYGATOR_BI/ ├── app/ # Streamlit Apps │ ├── main_app.py # Unified Dashboard │ ├── finanz_app.py │ ├── journal_app.py │ └── ernaehrung_app.py ├── contracts/ # Data Contracts (ODCS) │ ├── finanz-buchungen.yaml │ ├── journal-eintraege.yaml │ └── ernaehrung-protokoll.yaml ├── data/ # DuckDB Databases │ └── finanz.duckdb ├── models/ # SQLMesh Models │ ├── staging/ │ │ └── stg_buchungen.sql │ └── marts/ │ ├── dim_*.sql │ └── fact_buchungen.sql ├── sources/ # dlt Pipelines │ └── finanz_pipeline.py ├── src/capygator_bi/ # Dagster Package │ ├── assets/ │ │ ├── dlt_assets.py │ │ └── sqlmesh_assets.py │ └── definitions.py ├── config.yaml # SQLMesh Config └── pyproject.toml # Project Config