Getting Started with DataXPipe
Learn how to define declarative pipeline specs, generate Airflow DAGs and SQL transforms, register metadata with the Catalog API, and run your first data quality checks.
- getting-started
- pipeline-specs
- catalog-api
DataXPipe turns YAML pipeline specifications into runnable artifacts—Airflow DAGs, SQL transforms, quality checks, and metadata files—while keeping a searchable catalog of pipelines, datasets, and lineage. This guide walks through a complete first pipeline from spec to production monitoring.
Why declarative pipelines?
Most data teams maintain three separate artifacts for every pipeline: orchestration code, transform SQL, and documentation about what depends on what. When the warehouse schema changes, all three drift apart.
DataXPipe collapses that into a single pipeline spec validated against a JSON Schema. The generator produces consistent outputs and registers structured metadata so downstream systems (and humans) always know what ran, on what schedule, and against which datasets.
Prerequisites
Before you begin, ensure you have:
- Python 3.11 or 3.12
- A virtual environment with DataXPipe dependencies installed
- Network access to your source warehouse (Snowflake, BigQuery, or Postgres)
Clone the repository and install dependencies:
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install -r requirements.txt
Step 1: Understand the pipeline spec
A minimal DataXPipe spec defines sources, transforms, sinks, checks, and schedule metadata. Here is a simplified orders sync pipeline:
name: orders_sync
description: Incremental sync of raw orders into the analytics warehouse
schedule: "0 6 * * *"
owner: data-platform@example.com
sources:
- id: raw_orders
connector: snowflake
database: RAW
schema: ECOMMERCE
table: ORDERS
transforms:
- id: orders_clean
sql: transforms/orders_clean.sql
inputs: [raw_orders]
outputs: [clean_orders]
sinks:
- id: clean_orders
connector: bigquery
dataset: analytics
table: orders_clean
checks:
- id: chk_not_null_order_id
sql: checks/chk_not_null_order_id.sql
severity: error
- id: chk_freshness
sql: checks/chk_freshness.sql
severity: warn
threshold_hours: 26
Key design choices:
- Stable IDs (
raw_orders,clean_orders) become dataset identifiers in the catalog and lineage graph. - Checks are first-class—they ship with the pipeline, not as afterthought scripts.
- Connectors are abstracted so the same spec pattern works across warehouses.
Step 2: Generate artifacts
Run the generator against your spec:
python -m generator.run_example
This writes artifacts to generated/orders_sync/:
| Directory | Contents |
|---|---|
airflow/ | DAG that orchestrates transforms and posts run events |
sql/ | Transform SQL referenced by the spec |
checks/ | Runnable check scripts |
tests/ | Unit tests for each check |
metadata/ | pipeline.json, datasets.json, lineage.json |
Inspect metadata/lineage.json before deploying—you should see edges from raw_orders → orders_clean → clean_orders.
Step 3: Start the Catalog API
The Catalog is the system of record for pipeline metadata, run history, and check results:
uvicorn app.main:app --reload --port 8000
By default the API persists to dataxpipe.db (SQLite). Set DATAXPIPE_DB for a custom path or use Postgres in production.
Register your generated pipeline:
curl -X POST http://localhost:8000/pipelines `
-H "X-API-KEY: platform-key" `
-H "Content-Type: application/json" `
-d @generated/orders_sync/metadata/pipeline.json
Verify registration:
curl http://localhost:8000/pipelines/orders_sync
Step 4: Configure environment variables
Generated DAGs and checks communicate with the Catalog via environment variables:
| Variable | Purpose | Default |
|---|---|---|
DATAXPIPE_URL | Catalog base URL | http://localhost:8000 |
DATAXPIPE_API_KEY | API key for RBAC | platform-key |
DATAXPIPE_RETRY_ATTEMPTS | Connector query retries | 3 |
DATAXPIPE_TIMEOUT_SECONDS | Per-query timeout | 30 |
In production, inject these via your secrets manager (Vault, AWS Secrets Manager, etc.) rather than hardcoding in DAG files.
Step 5: Deploy and validate
- Copy generated Airflow DAGs to your scheduler’s
dags/folder. - Ensure warehouse connections are configured in Airflow with the same names referenced in your spec.
- Trigger a manual run and confirm a run record appears in the Catalog:
curl http://localhost:8000/runs?pipeline=orders_sync
- After checks execute, verify results:
curl http://localhost:8000/checks/results?pipeline=orders_sync
A successful first run means transforms completed, checks posted results, and lineage is queryable via GET /lineage/<dataset_id>.
Common pitfalls
Mismatched dataset IDs. If you rename a transform output in the spec but forget to update downstream references, lineage edges break silently until the next generation run. Always regenerate after spec edits.
Missing API key on check scripts. Checks run outside Airflow still need DATAXPIPE_API_KEY to post results. CI pipelines that execute checks in isolation often forget this.
Schema drift without check coverage. Start with not_null, freshness, and schema checks—the three that catch 80% of production incidents. Add domain-specific checks (referential integrity, row-count bounds) once the baseline is green.
Next steps
- Explore the lineage best practices guide to model complex DAGs.
- Read the data quality checks guide for severity levels and alerting patterns.
- Open the DataXPipe app to manage pipelines, connections, and runs in a unified UI.
Declarative specs are an investment upfront, but they pay off when onboarding new engineers, auditing compliance, and recovering from incidents—you always have a single source of truth for what the pipeline is supposed to do.