DX
Getting Started

Getting Started with DataXPipe

Learn how to define declarative pipeline specs, generate Airflow DAGs and SQL transforms, register metadata with the Catalog API, and run your first data quality checks.

DataXPipe Team
  • getting-started
  • pipeline-specs
  • catalog-api

DataXPipe turns YAML pipeline specifications into runnable artifacts—Airflow DAGs, SQL transforms, quality checks, and metadata files—while keeping a searchable catalog of pipelines, datasets, and lineage. This guide walks through a complete first pipeline from spec to production monitoring.

Why declarative pipelines?

Most data teams maintain three separate artifacts for every pipeline: orchestration code, transform SQL, and documentation about what depends on what. When the warehouse schema changes, all three drift apart.

DataXPipe collapses that into a single pipeline spec validated against a JSON Schema. The generator produces consistent outputs and registers structured metadata so downstream systems (and humans) always know what ran, on what schedule, and against which datasets.

Prerequisites

Before you begin, ensure you have:

  • Python 3.11 or 3.12
  • A virtual environment with DataXPipe dependencies installed
  • Network access to your source warehouse (Snowflake, BigQuery, or Postgres)

Clone the repository and install dependencies:

python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install -r requirements.txt

Step 1: Understand the pipeline spec

A minimal DataXPipe spec defines sources, transforms, sinks, checks, and schedule metadata. Here is a simplified orders sync pipeline:

name: orders_sync
description: Incremental sync of raw orders into the analytics warehouse
schedule: "0 6 * * *"
owner: data-platform@example.com

sources:
  - id: raw_orders
    connector: snowflake
    database: RAW
    schema: ECOMMERCE
    table: ORDERS

transforms:
  - id: orders_clean
    sql: transforms/orders_clean.sql
    inputs: [raw_orders]
    outputs: [clean_orders]

sinks:
  - id: clean_orders
    connector: bigquery
    dataset: analytics
    table: orders_clean

checks:
  - id: chk_not_null_order_id
    sql: checks/chk_not_null_order_id.sql
    severity: error
  - id: chk_freshness
    sql: checks/chk_freshness.sql
    severity: warn
    threshold_hours: 26

Key design choices:

  • Stable IDs (raw_orders, clean_orders) become dataset identifiers in the catalog and lineage graph.
  • Checks are first-class—they ship with the pipeline, not as afterthought scripts.
  • Connectors are abstracted so the same spec pattern works across warehouses.

Step 2: Generate artifacts

Run the generator against your spec:

python -m generator.run_example

This writes artifacts to generated/orders_sync/:

DirectoryContents
airflow/DAG that orchestrates transforms and posts run events
sql/Transform SQL referenced by the spec
checks/Runnable check scripts
tests/Unit tests for each check
metadata/pipeline.json, datasets.json, lineage.json

Inspect metadata/lineage.json before deploying—you should see edges from raw_ordersorders_cleanclean_orders.

Step 3: Start the Catalog API

The Catalog is the system of record for pipeline metadata, run history, and check results:

uvicorn app.main:app --reload --port 8000

By default the API persists to dataxpipe.db (SQLite). Set DATAXPIPE_DB for a custom path or use Postgres in production.

Register your generated pipeline:

curl -X POST http://localhost:8000/pipelines `
  -H "X-API-KEY: platform-key" `
  -H "Content-Type: application/json" `
  -d @generated/orders_sync/metadata/pipeline.json

Verify registration:

curl http://localhost:8000/pipelines/orders_sync

Step 4: Configure environment variables

Generated DAGs and checks communicate with the Catalog via environment variables:

VariablePurposeDefault
DATAXPIPE_URLCatalog base URLhttp://localhost:8000
DATAXPIPE_API_KEYAPI key for RBACplatform-key
DATAXPIPE_RETRY_ATTEMPTSConnector query retries3
DATAXPIPE_TIMEOUT_SECONDSPer-query timeout30

In production, inject these via your secrets manager (Vault, AWS Secrets Manager, etc.) rather than hardcoding in DAG files.

Step 5: Deploy and validate

  1. Copy generated Airflow DAGs to your scheduler’s dags/ folder.
  2. Ensure warehouse connections are configured in Airflow with the same names referenced in your spec.
  3. Trigger a manual run and confirm a run record appears in the Catalog:
curl http://localhost:8000/runs?pipeline=orders_sync
  1. After checks execute, verify results:
curl http://localhost:8000/checks/results?pipeline=orders_sync

A successful first run means transforms completed, checks posted results, and lineage is queryable via GET /lineage/<dataset_id>.

Common pitfalls

Mismatched dataset IDs. If you rename a transform output in the spec but forget to update downstream references, lineage edges break silently until the next generation run. Always regenerate after spec edits.

Missing API key on check scripts. Checks run outside Airflow still need DATAXPIPE_API_KEY to post results. CI pipelines that execute checks in isolation often forget this.

Schema drift without check coverage. Start with not_null, freshness, and schema checks—the three that catch 80% of production incidents. Add domain-specific checks (referential integrity, row-count bounds) once the baseline is green.

Next steps

Declarative specs are an investment upfront, but they pay off when onboarding new engineers, auditing compliance, and recovering from incidents—you always have a single source of truth for what the pipeline is supposed to do.