BigQuery Freshness Checks in DataXPipe
Implement freshness checks against BigQuery tables, configure thresholds in pipeline specs, and post structured results to the DataXPipe Catalog.
- bigquery
- freshness
- data-quality
Freshness is the most common production incident for batch pipelines: Airflow reports success, but the table has not received new rows in days. DataXPipe freshness checks detect stale data before stakeholders do. This guide focuses on BigQuery-specific patterns.
Why freshness checks matter
Batch pipelines fail silently when:
- Upstream ingestion skips a partition
- Incremental watermark logic has an off-by-one error
- A merge statement matches zero rows due to filter drift
Transform tasks complete without error. Only a freshness check comparing MAX(timestamp) against SLA thresholds catches the problem.
Register a BigQuery connection
curl -X POST https://api.dataxpipe.com/api/v1/connections/ `
-H "X-API-KEY: dxp_your_key" `
-H "Content-Type: application/json" `
-d '{
"id": "bq-prod",
"type": "bigquery",
"config": {
"project": "my-gcp-project",
"location": "US"
}
}'
Authentication uses GOOGLE_APPLICATION_CREDENTIALS pointing to a service account JSON key with bigquery.jobs.create and table read permissions.
Declaring freshness in the spec
checks:
- id: chk_orders_freshness
type: freshness
target: clean_orders
max_delay_minutes: 1560 # 26 hours
targets:
- id: clean_orders
type: bigquery
connection_ref: bq-prod
object: analytics.orders_clean
write_mode: merge
primary_key: [order_id]
The generator creates a runnable check script in tests/test_chk_orders_freshness.py that executes against BigQuery and POSTs results to the Catalog.
BigQuery freshness SQL
For custom SQL checks, use BigQuery-native timestamp functions:
-- checks/chk_orders_freshness.sql
SELECT COUNT(*) AS violations
FROM (
SELECT MAX(order_ts) AS latest_ts
FROM `my-gcp-project.analytics.orders_clean`
)
WHERE latest_ts < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 26 HOUR)
A non-zero violations count means data is stale beyond the SLA threshold.
Partitioned tables
Scan only recent partitions for performance on large tables:
SELECT COUNT(*) AS violations
FROM (
SELECT MAX(order_ts) AS latest_ts
FROM `my-gcp-project.analytics.orders_clean`
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
)
WHERE latest_ts < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 26 HOUR)
BigQuery charges by bytes scanned—partition filters reduce cost dramatically.
Executing via the Catalog API
Run checks on demand or from CI:
curl -X POST https://api.dataxpipe.com/api/v1/checks/execute `
-H "X-API-KEY: dxp_your_key" `
-H "Content-Type: application/json" `
-d '{
"connection_id": "bq-prod",
"sql": "SELECT COUNT(*) AS violations FROM (SELECT MAX(order_ts) AS latest_ts FROM `my-gcp-project.analytics.orders_clean`) WHERE latest_ts < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 26 HOUR)",
"check_id": "chk_orders_freshness",
"run_id": "run-20250620-001"
}'
When freshness fails, query lineage upstream, check the last successful Catalog run, and inspect Airflow transform logs for watermark bugs.
BigQuery-specific pitfalls
| Pitfall | Symptom | Fix |
|---|---|---|
| Streaming buffer delay | MAX(ts) excludes recent rows | Wait 90 minutes or query __TABLES__ metadata |
| Timezone mismatch | False stale alerts | Use CURRENT_TIMESTAMP() consistently; store UTC |
| Partition expiration | Old partitions deleted | Adjust check window to match retention policy |
| Cross-region dataset | Higher latency/timeouts | Set location in connection config |
Use SELECT MAX(column) aggregations, partition filters, and DATAXPIPE_TIMEOUT_SECONDS=30. Run checks after transforms complete.
See the data quality checks guide and monitoring pipeline runs.