Outcome focus: Reader can scaffold a dbt and BigQuery project with manifest-backed incremental loads, timestamp-first snapshots, partitioned models, and a dry-run bytes gate before production promotion.
dbtbigquerydata engineeringanalytics engineeringcicost controls
The first dbt on BigQuery project I distrust is the one with green models and no cost gate.
It demos cleanly. The raw table loads. The staging model runs. The first snapshot captures history. Then a pull request widens a date filter, a staging table loses its partition requirement, or a snapshot quietly treats a hard delete as if the customer still exists. Nobody sees the failure during review because the SQL compiles and the first run works.
I have seen this pattern go wrong in two directions. One version optimizes for speed and lets every model read all of raw history because the team will "tighten it later." The other version wraps everything in process and makes every change feel like a production migration. Both miss the smaller operating contract that BigQuery and dbt are good at: load raw data with a traceable manifest, make partition filters part of the model shape, snapshot source history with explicit change rules, and reject expensive SQL before it promotes.
This starter kit is the version I would want in a new dbt on BigQuery repo before the first real mart ships.
The tradeoff is ceremony. You will add a manifest table, a few macros, a dry-run script, and a CI step before the project feels "simple." The payoff is that the expensive and history-breaking failures become reviewable artifacts instead of surprises in production.
Operating Map#
The starter kit assumes:
- dbt Core with
dbt-bigquery. - BigQuery raw files arriving as Parquet in Cloud Storage.
- A CI service account that can read source tables, create objects in a CI dataset, and run dry-run query jobs.
- A production or main-branch dbt manifest if you want state-aware CI selection. If you do not have one yet, run the gate over all models until you add state artifacts.
The official docs are worth keeping near the code. dbt's BigQuery adapter supports partition_by, cluster_by, and require_partition_filter model configs for table shape and query pruning. BigQuery's LOAD DATA DDL can load Cloud Storage files, create partitioned and clustered tables, and set table options such as require_partition_filter. BigQuery dry runs validate queries and estimate bytes processed without charging for query execution. dbt snapshots support timestamp and check strategies, with timestamp recommended when a reliable updated_at column exists.
Project Skeleton#
analytics/
dbt_project.yml
models/
sources.yml
staging/
stg__orders.sql
staging.yml
marts/
fct_orders.sql
marts.yml
macros/
bigquery_load.sql
manifest_utils.sql
snapshots/
customer_snap.sql
snapshots.yml
tests/
customer_snapshot_no_overlap.sql
scripts/
dry_run_dbt_models.py
.github/workflows/
dbt_ci.ymlThe important boundary: dbt is not the only ingestion tool. A scheduler such as Cloud Composer, Dagster, GitHub Actions, Cloud Build, or Workflows can call bq load, BigQuery LOAD DATA, or the Storage Write API. dbt owns the model contract, tests, snapshot logic, and release gate. I still like keeping the load SQL shape near dbt because it makes the raw table contract visible to analytics engineers.
dbt Defaults#
Start with explicit defaults rather than letting every model invent its own materialization.
name: analytics
version: "1.0.0"
config-version: 2
profile: analytics
model-paths: ["models"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
test-paths: ["tests"]
vars:
ingest_manifest_dataset: "your_project.ingest"
ingest_manifest_table: "your_project.ingest.manifests"
ci_bytes_threshold: 107374182400 # 100 GiB
models:
analytics:
staging:
+materialized: incremental
+on_schema_change: fail
marts:
+materialized: table
snapshots:
analytics:
+target_schema: snapshotsI put on_schema_change: fail on staging because a raw schema drift should be a review event. If your source adds columns every day and that is acceptable, loosen it intentionally. Do not accidentally accept schema drift because the default was convenient.
Source Contracts#
Keep source declarations boring and close to the models that use them.
version: 2
sources:
- name: erp
database: your_project
schema: raw_erp
tables:
- name: orders_raw
loaded_at_field: load_ts
freshness:
warn_after: {count: 3, period: hour}
error_after: {count: 8, period: hour}
- name: crm
database: your_project
schema: raw_crm
tables:
- name: customers
loaded_at_field: updated_atThe source freshness check will not prove semantic correctness. It catches the more basic failure: nobody loaded data recently and the pipeline still looks green because the models can query yesterday's table.
Manifest Table#
Raw loads need a ledger. The manifest table is deliberately small: one row per loaded file or batch, with enough metadata to answer "what entered the warehouse, when, and through which job?"
{% macro bq_ident(name) -%}
`{{ name | replace('`', '') }}`
{%- endmacro %}
{% macro ensure_ingest_manifest(
manifest_dataset=var("ingest_manifest_dataset"),
manifest_table=var("ingest_manifest_table")
) %}
{% if execute %}
{% do run_query("create schema if not exists " ~ bq_ident(manifest_dataset)) %}
{% set ddl %}
create table if not exists {{ bq_ident(manifest_table) }} (
source_name string not null,
source_table string not null,
file_uri string not null,
load_ts timestamp not null,
row_count int64,
content_hash string,
load_job_id string,
loaded_at timestamp not null default current_timestamp()
)
partition by date(load_ts)
cluster by source_name, source_table
options(require_partition_filter = true)
{% endset %}
{% do run_query(ddl) %}
{% endif %}
{% endmacro %}
{% macro latest_loaded_at_sql(
source_name,
source_table,
manifest_table=var("ingest_manifest_table")
) -%}
(
select coalesce(max(load_ts), timestamp '1970-01-01 00:00:00+00')
from {{ bq_ident(manifest_table) }}
where source_name = '{{ source_name }}'
and source_table = '{{ source_table }}'
)
{%- endmacro %}
{% macro record_loaded_file(
source_name,
source_table,
file_uri,
load_ts,
row_count="null",
content_hash="",
load_job_id="",
manifest_table=var("ingest_manifest_table")
) %}
{% if execute %}
{% set sql %}
merge {{ bq_ident(manifest_table) }} as target
using (
select
'{{ source_name }}' as source_name,
'{{ source_table }}' as source_table,
'{{ file_uri }}' as file_uri,
timestamp '{{ load_ts }}' as load_ts,
{{ row_count }} as row_count,
nullif('{{ content_hash }}', '') as content_hash,
nullif('{{ load_job_id }}', '') as load_job_id
) as source
on target.source_name = source.source_name
and target.source_table = source.source_table
and target.file_uri = source.file_uri
when not matched then
insert (
source_name,
source_table,
file_uri,
load_ts,
row_count,
content_hash,
load_job_id
)
values (
source.source_name,
source.source_table,
source.file_uri,
source.load_ts,
source.row_count,
source.content_hash,
source.load_job_id
)
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}Run the table setup once per environment:
dbt run-operation ensure_ingest_manifest --target prodThen call record_loaded_file from your orchestrator after a load job succeeds:
dbt run-operation record_loaded_file \
--args '{
"source_name": "erp",
"source_table": "orders_raw",
"file_uri": "gs://warehouse-landing/erp/orders/load_date=2026-05-16/part-000.parquet",
"load_ts": "2026-05-16 13:00:00+00",
"row_count": "248391",
"content_hash": "sha256:example",
"load_job_id": "bquxjob_example"
}' \
--target prodDo not record the manifest before the load commits. A manifest row without a corresponding table load is worse than no manifest because incremental models will skip data that never arrived.
BigQuery Load Macro#
For hourly or daily batches, prefer batch loads from Cloud Storage unless the product actually needs minute-level availability. BigQuery's LOAD DATA statement can load Cloud Storage files into a managed table, including Parquet. If you need streaming SLAs, the Storage Write API is the right boundary, especially when exactly-once semantics matter.
This macro gives the orchestration layer a repeatable LOAD DATA shape. It is meant for dbt run-operation, not for use inside a model.
{% macro bq_load_parquet(
target_table,
source_uris,
mode="append",
partition_by="_PARTITIONDATE",
cluster_by=[],
require_partition_filter=true
) %}
{% if execute %}
{% if mode not in ["append", "overwrite"] %}
{{ exceptions.raise_compiler_error("mode must be append or overwrite") }}
{% endif %}
{% set command = "load data overwrite" if mode == "overwrite" else "load data into" %}
{% set quoted_uris = [] %}
{% for uri in source_uris %}
{% do quoted_uris.append("'" ~ uri ~ "'") %}
{% endfor %}
{% set cluster_clause = "" %}
{% if cluster_by | length > 0 %}
{% set cluster_clause = " cluster by " ~ (cluster_by | join(", ")) %}
{% endif %}
{% set options_clause = "" %}
{% if require_partition_filter %}
{% set options_clause = " options(require_partition_filter = true)" %}
{% endif %}
{% set sql %}
{{ command }} {{ bq_ident(target_table) }}
{% if partition_by %}
partition by {{ partition_by }}
{% endif %}
{{ cluster_clause }}
{{ options_clause }}
from files(
format = 'PARQUET',
uris = [{{ quoted_uris | join(", ") }}]
)
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}Call it like this:
dbt run-operation bq_load_parquet \
--args '{
"target_table": "your_project.raw_erp.orders_raw",
"source_uris": ["gs://warehouse-landing/erp/orders/load_date=2026-05-16/*.parquet"],
"mode": "append",
"partition_by": "_PARTITIONDATE",
"cluster_by": ["customer_id"],
"require_partition_filter": true
}' \
--target prodFor governed production tables, I still prefer table shape in dbt model config or infrastructure code, then use load jobs only to append data. The macro is most useful for raw landing tables and bootstrap environments. After the table is established, inspect the actual BigQuery table options instead of trusting the macro invocation:
select
table_name,
partitioning_type,
clustering_column_1,
require_partition_filter
from `your_project.raw_erp.INFORMATION_SCHEMA.TABLE_OPTIONS`
where table_name = 'orders_raw';The specific INFORMATION_SCHEMA view you use will vary by option. The operating habit matters more than the exact query: verify the warehouse state, not only the dbt command that intended to create it.
Incremental Staging Model#
The staging model should be cheap by construction. It should filter the raw ingestion partition, use the manifest timestamp to limit new work, and materialize into a partitioned target.
{{
config(
materialized="incremental",
unique_key="order_id",
incremental_strategy="merge",
on_schema_change="fail",
partition_by={
"field": "order_date",
"data_type": "date"
},
cluster_by=["customer_id"],
require_partition_filter=true
)
}}
with source as (
select
order_id,
customer_id,
order_date,
amount,
status,
load_ts
from {{ source("erp", "orders_raw") }}
where _PARTITIONDATE >= date_sub(current_date(), interval 7 day)
{% if is_incremental() %}
and load_ts > {{ latest_loaded_at_sql("erp", "orders_raw") }}
{% endif %}
),
typed as (
select
cast(order_id as string) as order_id,
cast(customer_id as string) as customer_id,
cast(order_date as date) as order_date,
cast(amount as numeric) as amount,
cast(status as string) as status,
cast(load_ts as timestamp) as load_ts
from source
)
select *
from typedThe seven-day partition window is not magic. It is a guardrail. If your files arrive late by 48 hours, seven days leaves room. If your source backfills months of history, use an explicit backfill mode with a different selector and threshold. Do not make the normal incremental path carry the cost of every exception.
Document the staging contract beside the model:
version: 2
models:
- name: stg__orders
description: "Typed order events from raw ERP loads, limited by raw ingestion partitions and the ingest manifest."
columns:
- name: order_id
data_tests:
- not_null
- unique
- name: customer_id
data_tests:
- not_null
- name: order_date
data_tests:
- not_null
- name: load_ts
data_tests:
- not_nullThe important test is not exotic. If order_id is not unique in staging, every downstream mart now has to guess whether it is seeing an update, a duplicate, or a source replay. Catch that where the shape first becomes typed.
Mart Model#
The mart can be ordinary if staging did its job.
{{
config(
materialized="table",
partition_by={
"field": "order_date",
"data_type": "date"
},
cluster_by=["customer_id"],
require_partition_filter=true
)
}}
select
order_id,
customer_id,
order_date,
amount,
status,
load_ts
from {{ ref("stg__orders") }}
where order_date >= date_sub(current_date(), interval 365 day)I would not add a year filter to every mart. I add it here to show the choice. A reporting mart with a known operating window can be deliberately bounded. A historical mart should carry a different cost expectation and a higher dry-run threshold. Both are fine when the decision is visible.
Snapshots That Do Not Bite Later#
Snapshots fail slowly. The table looks fine for the first run because every record is new. The bad design appears when a source row changes, disappears, or gains a column the snapshot was not expecting.
dbt recommends the timestamp strategy when a reliable updated_at exists because it tracks one change column and tolerates source schema evolution better than enumerating every checked column. Use check only when the source does not give you a trustworthy timestamp.
{% snapshot customer_snap %}
{{
config(
target_schema="snapshots",
unique_key="customer_id",
strategy="timestamp",
updated_at="updated_at",
hard_deletes="invalidate"
)
}}
select
customer_id,
name,
email,
status,
updated_at
from {{ source("crm", "customers") }}
{% endsnapshot %}For dbt Core 1.8 and earlier, you may see invalidate_hard_deletes=true. For new snapshots on current dbt, use hard_deletes. The useful choice is between:
| Config | Behavior |
|---|---|
hard_deletes="ignore" | Deleted source rows remain current in the snapshot. |
hard_deletes="invalidate" | Deleted source rows have the current snapshot record closed. |
hard_deletes="new_record" | Deleted source rows get a new record with deletion metadata. |
If downstream consumers need to distinguish "deleted" from "changed," use new_record. If they only need current validity windows, invalidate is usually enough. If deletes are impossible or irrelevant, ignore is simpler.
When no reliable timestamp exists, use explicit check_cols. Avoid check_cols="all" on wide mutable source tables unless you have accepted the cost and churn.
{{
config(
target_schema="snapshots",
unique_key="customer_id",
strategy="check",
check_cols=["name", "email", "status"],
hard_deletes="invalidate"
)
}}Add snapshot tests. A uniqueness test on the current row is not enough because history can overlap.
version: 2
snapshots:
- name: customer_snap
description: "Customer source history with one validity window per customer version."
columns:
- name: customer_id
data_tests:
- not_null
- name: dbt_valid_from
data_tests:
- not_nullwith windows as (
select
customer_id,
dbt_valid_from,
coalesce(dbt_valid_to, timestamp '9999-12-31 00:00:00+00') as dbt_valid_to
from {{ ref("customer_snap") }}
),
overlaps as (
select
left_window.customer_id,
left_window.dbt_valid_from as left_valid_from,
left_window.dbt_valid_to as left_valid_to,
right_window.dbt_valid_from as right_valid_from,
right_window.dbt_valid_to as right_valid_to
from windows as left_window
join windows as right_window
on left_window.customer_id = right_window.customer_id
and left_window.dbt_valid_from < right_window.dbt_valid_from
and left_window.dbt_valid_to > right_window.dbt_valid_from
)
select *
from overlapsThat test is intentionally plain SQL. A package test is fine too, but the overlapping-window condition is important enough that I like seeing the exact predicate in the repo.
Dry-Run Cost Gate#
EXPLAIN is useful for plan inspection, but the CI gate should use BigQuery dry-run query jobs. Dry runs validate the query and return estimated bytes processed without using query slots or charging for the dry-run execution.
I prefer a small Python script over scraping CLI text. The BigQuery Python client returns total_bytes_processed directly.
import argparse
import json
import os
import sys
from pathlib import Path
from google.cloud import bigquery
def read_selected_unique_ids(path: Path) -> list[str]:
unique_ids: list[str] = []
for line in path.read_text().splitlines():
if not line.strip():
continue
record = json.loads(line)
unique_id = record.get("unique_id")
if unique_id:
unique_ids.append(unique_id)
return unique_ids
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--manifest", default="target/manifest.json")
parser.add_argument("--selected", default="target/selected_models.jsonl")
parser.add_argument("--threshold-bytes", type=int, required=True)
parser.add_argument("--project", default=os.environ.get("GCP_PROJECT"))
parser.add_argument("--location", default=os.environ.get("BQ_LOCATION", "US"))
args = parser.parse_args()
manifest = json.loads(Path(args.manifest).read_text())
selected_ids = read_selected_unique_ids(Path(args.selected))
if not selected_ids:
print("No selected models found for dry run.")
return 0
client = bigquery.Client(project=args.project, location=args.location)
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
failures: list[str] = []
for unique_id in selected_ids:
node = manifest["nodes"].get(unique_id)
if not node or node.get("resource_type") != "model":
continue
sql = node.get("compiled_code")
if not sql:
print(f"SKIP {unique_id}: no compiled_code in manifest")
continue
job = client.query(sql, job_config=job_config)
bytes_processed = int(job.total_bytes_processed or 0)
print(f"{unique_id}: {bytes_processed} bytes")
if bytes_processed > args.threshold_bytes:
failures.append(
f"{unique_id} dry-runs at {bytes_processed} bytes, "
f"above threshold {args.threshold_bytes}"
)
if failures:
for failure in failures:
print(f"::error ::{failure}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())The threshold should be boring and explicit. A 100 GiB PR threshold might be strict for a wide mart and too loose for a small staging project. Start with a number that blocks obvious accidents, then add model-level exceptions only when the exception has an owner and an explanation.
GitHub Actions CI#
This workflow assumes Workload Identity Federation for GCP auth. Replace the provider and service account with your project values. If you use a JSON key instead, keep it in GitHub Actions secrets and rotate it like any other production credential.
name: dbt CI
on:
pull_request:
permissions:
contents: read
id-token: write
env:
DBT_PROFILES_DIR: .
DBT_TARGET: ci
DBT_STATE: target/prod_state
GCP_PROJECT: your_project
BQ_LOCATION: US
BYTES_THRESHOLD: "107374182400"
jobs:
dbt-ci:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
workload_identity_provider: projects/123456789/locations/global/workloadIdentityPools/github/providers/github
service_account: dbt-ci@your_project.iam.gserviceaccount.com
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install dbt-core dbt-bigquery google-cloud-bigquery
- name: Parse project
run: dbt parse --target "$DBT_TARGET"
- name: Compile project
run: dbt compile --target "$DBT_TARGET"
- name: Select models for dry run
run: |
mkdir -p target
if [ -d "$DBT_STATE" ]; then
dbt ls \
--target "$DBT_TARGET" \
--state "$DBT_STATE" \
--select state:modified+ \
--resource-type model \
--output json > target/selected_models.jsonl
else
dbt ls \
--target "$DBT_TARGET" \
--resource-type model \
--output json > target/selected_models.jsonl
fi
- name: Gate BigQuery bytes
run: |
python scripts/dry_run_dbt_models.py \
--manifest target/manifest.json \
--selected target/selected_models.jsonl \
--threshold-bytes "$BYTES_THRESHOLD" \
--project "$GCP_PROJECT" \
--location "$BQ_LOCATION"
- name: Build changed resources
run: |
if [ -d "$DBT_STATE" ]; then
dbt build \
--target "$DBT_TARGET" \
--state "$DBT_STATE" \
--select state:modified+ \
--defer
else
dbt build --target "$DBT_TARGET"
fiTwo details matter.
First, dbt parse and dbt compile are different checks. parse is the fast project-shape check. compile writes executable SQL to target/ and gives the dry-run script compiled SQL to submit to BigQuery.
Second, snapshots should run through dbt snapshot or dbt build, not dbt run. If you keep a snapshot smoke step separate from the full build, use:
dbt snapshot --target ci --select customer_snap
dbt test --target ci --select customer_snapFor most PRs, dbt build --select state:modified+ --defer is cleaner because it preserves DAG order across models, snapshots, seeds, and tests.
Local Smoke Commands#
Before wiring CI, prove the pieces by hand:
dbt debug --target ci
dbt parse --target ci
dbt compile --target ci --select stg__orders customer_snap
dbt run-operation ensure_ingest_manifest --target ci
dbt run --target ci --select stg__orders
dbt snapshot --target ci --select customer_snap
dbt test --target ci --select stg__orders customer_snapThen run a dry run against one compiled model:
dbt ls --target ci --select stg__orders --resource-type model --output json > target/selected_models.jsonl
python scripts/dry_run_dbt_models.py \
--manifest target/manifest.json \
--selected target/selected_models.jsonl \
--threshold-bytes 107374182400 \
--project your_project \
--location USI like this sequence because every command proves a separate contract. Debug proves profile access. Parse proves project structure. Compile proves Jinja and adapter SQL generation. The manifest operation proves side-effecting macros only run when asked. The model, snapshot, and tests prove runtime behavior. The dry run proves CI can measure cost before materialization.
The Review Checklist#
When a PR changes a BigQuery dbt model, I want the reviewer to answer these questions before approval:
| Surface | Review question |
|---|---|
| Raw load | Is the table partitioned and clustered for the way it will be queried? |
| Manifest | Does every committed file load produce one ledger row after the load succeeds? |
| Incremental filter | Does the model limit raw partitions and use the manifest or a reliable source watermark? |
| Snapshot | Is timestamp used when updated_at is reliable, and are check_cols explicit when it is not? |
| Hard deletes | Did the author choose ignore, invalidate, or new_record deliberately? |
| Tests | Do uniqueness, null, relationship, and snapshot-window tests match the declared grain? |
| Cost | Does the dry-run bytes estimate fit the model's normal operating budget? |
| Backfill | Is exceptional historical work handled by an explicit backfill path instead of the default PR gate? |
This is the same release-contract idea I use for Dataform and BigQuery governance, applied to dbt. BigQuery will let you scan too much. dbt will let you compile a model whose operating cost is absurd. The release path has to make cost, history, and grain visible.
Mistakes I Would Avoid#
Do not put side-effecting run_query macros inside model SQL unless you are extremely sure they are guarded by execute and only called in the right command. Compile and docs generation can run macros too. A load job belongs in orchestration or a deliberate run-operation.
Do not use check_cols="all" as the default snapshot fallback. It looks exhaustive, but a source schema change can create noisy history and wider comparisons than you expected. Pick the columns that define business change.
Do not update the manifest before the load succeeds. The manifest is a ledger, not a hope.
Do not rely on EXPLAIN as your only cost gate. Use dry-run query jobs for bytes. Use EXPLAIN when you need plan shape.
Do not pretend state-aware CI works until you actually have a saved production manifest. Without --state, state:modified+ has no comparison baseline. Run all models first, then optimize once the artifact path exists.
Do not set require_partition_filter=true without checking dbt tests and downstream models. The option is useful because it makes bad queries fail. That includes your own tests if they forget the partition predicate.
What This Buys You#
The win is not that the starter kit is clever. The win is that the common failure modes now have a place to attach.
| Failure | Artifact that catches it |
|---|---|
| A raw load did not happen | Source freshness and missing manifest rows |
| A file loaded twice | Manifest merge key on source and file URI |
| A staging model scans too much raw history | Raw partition filter plus dry-run bytes gate |
| A source schema drift sneaks into staging | on_schema_change: fail |
| A mutable source row changes | timestamp snapshot strategy |
| A hard delete is silently ignored | Explicit hard_deletes choice |
| A historical window overlaps | Snapshot overlap test |
| A PR would scan too much | BigQuery dry-run CI gate |
If you copy only one piece, copy the dry-run gate. Cost surprises are easy to prevent and hard to explain after promotion.
If you copy the whole kit, keep the boundaries intact: load files deliberately, record what loaded, build incrementally from a watermark, snapshot source history with an explicit delete policy, and make BigQuery estimate the bill before prod sees the SQL.