Airflow Integration Guide for DataXPipe
Deploy generated Airflow DAGs, wire Catalog run events, configure connections, and integrate check execution into your existing scheduler environment.
- airflow
- orchestration
- integrations
DataXPipe generates Airflow DAGs from declarative pipeline specs. The DAG orchestrates transforms, posts run lifecycle events to the Catalog, and triggers quality checks after data lands. This guide covers deployment patterns for teams already running Airflow 2.x.
What the generator produces
Running python -m generator.run_example writes to generated/<pipeline_name>/airflow/:
generated/orders_sync/
├── airflow/
│ └── orders_sync_dag.py
├── sql/
│ └── transforms/
├── checks/
├── tests/
└── metadata/
├── pipeline.json
├── datasets.json
└── lineage.json
The DAG uses PythonOperator tasks that:
- POST a
runningstatus toPOST /api/v1/runs/ - Execute transform SQL against registered warehouse connections
- Run generated check scripts that POST results to the Catalog
- Update the run record to
successorfailed
Deploying DAGs to Airflow
Copy generated DAG files to your scheduler’s dags/ folder or sync via CI:
# Example: rsync to Airflow worker
scp generated/orders_sync/airflow/orders_sync_dag.py airflow@scheduler:/opt/airflow/dags/
Ensure the Airflow Python environment includes DataXPipe dependencies if check scripts run inline. Alternatively, invoke check scripts as subprocesses with the project venv activated.
Environment variables in Airflow
Set these as Airflow Variables or inject via your secrets backend:
| Variable | Purpose |
|---|---|
DATAXPIPE_URL | Catalog API base URL |
DATAXPIPE_API_KEY | Org-scoped API key (dxp_...) |
DATAXPIPE_TIMEOUT_SECONDS | Per-query timeout (default 30) |
DATAXPIPE_RETRY_ATTEMPTS | Connector retry count (default 3) |
In Airflow 2.x, use Connections for warehouse credentials and Variables for Catalog config:
# Airflow Variable example (set in UI or CLI)
# Key: dataxpipe_url Value: https://api.dataxpipe.com
# Key: dataxpipe_api_key Value: dxp_abc123
Never hardcode API keys in DAG source files committed to git.
Warehouse connections
Airflow connections must match connection_ref values in your pipeline spec. Register the same connections in the Catalog for check execution:
curl -X POST https://api.dataxpipe.com/api/v1/connections/ `
-H "X-API-KEY: dxp_your_key" `
-H "Content-Type: application/json" `
-d '{"id":"sf-prod","type":"snowflake","config":{"account":"xyz","user":"..."}}'
The Catalog connection ID and Airflow connection ID should align so operators can trace failures across both systems.
Run event flow
Each DAG execution creates a Catalog run record:
DAG start → POST /runs/ (status: running)
→ transform tasks
→ check tasks → POST /checks/results
DAG end → PATCH run (status: success | failed)
Query run history from the product UI or API:
curl "https://api.dataxpipe.com/api/v1/pipelines/orders_sync/runs" `
-H "X-API-KEY: dxp_your_key"
This gives stakeholders a single timeline independent of Airflow’s task log UI.
CI/CD integration
Recommended deploy pipeline:
- Validate YAML spec against JSON Schema in CI
- Regenerate artifacts on spec merge
- POST
pipeline.jsonto Catalog (fail on 409 without version bump) - Sync DAG files to Airflow
- Trigger a test run against staging connections
Block merges that rename dataset IDs without updating downstream pipeline references—lineage queries in CI catch broken edges early.
Scheduling and backfill
The spec’s schedule field (cron expression) maps directly to the DAG’s schedule_interval. For backfills:
- Trigger the DAG manually in Airflow with a logical date range
- Verify run records appear in the Catalog for each execution
- Confirm check results reference the correct
run_id
Troubleshooting
| Symptom | Likely cause | Fix |
|---|---|---|
| Checks pass in Airflow but no Catalog results | Missing DATAXPIPE_API_KEY | Set env var on worker |
| 401 on run POST | Expired or wrong API key | Rotate org key in product UI |
| Transform succeeds, check times out | Large table scan | Add partition filter; increase timeout |
See data quality checks guide and pipeline lineage best practices.