DX
Data Quality

BigQuery Freshness Checks in DataXPipe

Implement freshness checks against BigQuery tables, configure thresholds in pipeline specs, and post structured results to the DataXPipe Catalog.

DataXPipe Team
  • bigquery
  • freshness
  • data-quality

Freshness is the most common production incident for batch pipelines: Airflow reports success, but the table has not received new rows in days. DataXPipe freshness checks detect stale data before stakeholders do. This guide focuses on BigQuery-specific patterns.

Why freshness checks matter

Batch pipelines fail silently when:

  • Upstream ingestion skips a partition
  • Incremental watermark logic has an off-by-one error
  • A merge statement matches zero rows due to filter drift

Transform tasks complete without error. Only a freshness check comparing MAX(timestamp) against SLA thresholds catches the problem.

Register a BigQuery connection

curl -X POST https://api.dataxpipe.com/api/v1/connections/ `
  -H "X-API-KEY: dxp_your_key" `
  -H "Content-Type: application/json" `
  -d '{
    "id": "bq-prod",
    "type": "bigquery",
    "config": {
      "project": "my-gcp-project",
      "location": "US"
    }
  }'

Authentication uses GOOGLE_APPLICATION_CREDENTIALS pointing to a service account JSON key with bigquery.jobs.create and table read permissions.

Declaring freshness in the spec

checks:
  - id: chk_orders_freshness
    type: freshness
    target: clean_orders
    max_delay_minutes: 1560   # 26 hours

targets:
  - id: clean_orders
    type: bigquery
    connection_ref: bq-prod
    object: analytics.orders_clean
    write_mode: merge
    primary_key: [order_id]

The generator creates a runnable check script in tests/test_chk_orders_freshness.py that executes against BigQuery and POSTs results to the Catalog.

BigQuery freshness SQL

For custom SQL checks, use BigQuery-native timestamp functions:

-- checks/chk_orders_freshness.sql
SELECT COUNT(*) AS violations
FROM (
  SELECT MAX(order_ts) AS latest_ts
  FROM `my-gcp-project.analytics.orders_clean`
)
WHERE latest_ts < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 26 HOUR)

A non-zero violations count means data is stale beyond the SLA threshold.

Partitioned tables

Scan only recent partitions for performance on large tables:

SELECT COUNT(*) AS violations
FROM (
  SELECT MAX(order_ts) AS latest_ts
  FROM `my-gcp-project.analytics.orders_clean`
  WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
)
WHERE latest_ts < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 26 HOUR)

BigQuery charges by bytes scanned—partition filters reduce cost dramatically.

Executing via the Catalog API

Run checks on demand or from CI:

curl -X POST https://api.dataxpipe.com/api/v1/checks/execute `
  -H "X-API-KEY: dxp_your_key" `
  -H "Content-Type: application/json" `
  -d '{
    "connection_id": "bq-prod",
    "sql": "SELECT COUNT(*) AS violations FROM (SELECT MAX(order_ts) AS latest_ts FROM `my-gcp-project.analytics.orders_clean`) WHERE latest_ts < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 26 HOUR)",
    "check_id": "chk_orders_freshness",
    "run_id": "run-20250620-001"
  }'

When freshness fails, query lineage upstream, check the last successful Catalog run, and inspect Airflow transform logs for watermark bugs.

BigQuery-specific pitfalls

PitfallSymptomFix
Streaming buffer delayMAX(ts) excludes recent rowsWait 90 minutes or query __TABLES__ metadata
Timezone mismatchFalse stale alertsUse CURRENT_TIMESTAMP() consistently; store UTC
Partition expirationOld partitions deletedAdjust check window to match retention policy
Cross-region datasetHigher latency/timeoutsSet location in connection config

Use SELECT MAX(column) aggregations, partition filters, and DATAXPIPE_TIMEOUT_SECONDS=30. Run checks after transforms complete.

See the data quality checks guide and monitoring pipeline runs.