DX
Integrations

Airflow Integration Guide for DataXPipe

Deploy generated Airflow DAGs, wire Catalog run events, configure connections, and integrate check execution into your existing scheduler environment.

DataXPipe Team
  • airflow
  • orchestration
  • integrations

DataXPipe generates Airflow DAGs from declarative pipeline specs. The DAG orchestrates transforms, posts run lifecycle events to the Catalog, and triggers quality checks after data lands. This guide covers deployment patterns for teams already running Airflow 2.x.

What the generator produces

Running python -m generator.run_example writes to generated/<pipeline_name>/airflow/:

generated/orders_sync/
├── airflow/
│   └── orders_sync_dag.py
├── sql/
│   └── transforms/
├── checks/
├── tests/
└── metadata/
    ├── pipeline.json
    ├── datasets.json
    └── lineage.json

The DAG uses PythonOperator tasks that:

  1. POST a running status to POST /api/v1/runs/
  2. Execute transform SQL against registered warehouse connections
  3. Run generated check scripts that POST results to the Catalog
  4. Update the run record to success or failed

Deploying DAGs to Airflow

Copy generated DAG files to your scheduler’s dags/ folder or sync via CI:

# Example: rsync to Airflow worker
scp generated/orders_sync/airflow/orders_sync_dag.py airflow@scheduler:/opt/airflow/dags/

Ensure the Airflow Python environment includes DataXPipe dependencies if check scripts run inline. Alternatively, invoke check scripts as subprocesses with the project venv activated.

Environment variables in Airflow

Set these as Airflow Variables or inject via your secrets backend:

VariablePurpose
DATAXPIPE_URLCatalog API base URL
DATAXPIPE_API_KEYOrg-scoped API key (dxp_...)
DATAXPIPE_TIMEOUT_SECONDSPer-query timeout (default 30)
DATAXPIPE_RETRY_ATTEMPTSConnector retry count (default 3)

In Airflow 2.x, use Connections for warehouse credentials and Variables for Catalog config:

# Airflow Variable example (set in UI or CLI)
# Key: dataxpipe_url  Value: https://api.dataxpipe.com
# Key: dataxpipe_api_key  Value: dxp_abc123

Never hardcode API keys in DAG source files committed to git.

Warehouse connections

Airflow connections must match connection_ref values in your pipeline spec. Register the same connections in the Catalog for check execution:

curl -X POST https://api.dataxpipe.com/api/v1/connections/ `
  -H "X-API-KEY: dxp_your_key" `
  -H "Content-Type: application/json" `
  -d '{"id":"sf-prod","type":"snowflake","config":{"account":"xyz","user":"..."}}'

The Catalog connection ID and Airflow connection ID should align so operators can trace failures across both systems.

Run event flow

Each DAG execution creates a Catalog run record:

DAG start → POST /runs/ (status: running)
  → transform tasks
  → check tasks → POST /checks/results
DAG end → PATCH run (status: success | failed)

Query run history from the product UI or API:

curl "https://api.dataxpipe.com/api/v1/pipelines/orders_sync/runs" `
  -H "X-API-KEY: dxp_your_key"

This gives stakeholders a single timeline independent of Airflow’s task log UI.

CI/CD integration

Recommended deploy pipeline:

  1. Validate YAML spec against JSON Schema in CI
  2. Regenerate artifacts on spec merge
  3. POST pipeline.json to Catalog (fail on 409 without version bump)
  4. Sync DAG files to Airflow
  5. Trigger a test run against staging connections

Block merges that rename dataset IDs without updating downstream pipeline references—lineage queries in CI catch broken edges early.

Scheduling and backfill

The spec’s schedule field (cron expression) maps directly to the DAG’s schedule_interval. For backfills:

  1. Trigger the DAG manually in Airflow with a logical date range
  2. Verify run records appear in the Catalog for each execution
  3. Confirm check results reference the correct run_id

Troubleshooting

SymptomLikely causeFix
Checks pass in Airflow but no Catalog resultsMissing DATAXPIPE_API_KEYSet env var on worker
401 on run POSTExpired or wrong API keyRotate org key in product UI
Transform succeeds, check times outLarge table scanAdd partition filter; increase timeout

See data quality checks guide and pipeline lineage best practices.