Multi-Model Pipelines
Bridge Town projects can contain multiple Python models that compose a
multi-step financial pipeline — for example, revenue → expenses →
summary, where each model reads the outputs of the model before it.
This page explains how to wire them together using the PIPELINE list in
run.py, how the branch-scoped /upstream mount carries intermediate
outputs between models, and where Google Sheets fit (spoiler: they are
external I/O, not intra-run transport).
Two kinds of data paths
Section titled “Two kinds of data paths”| Path | What it carries | Scope | Access |
|---|---|---|---|
/data/ | CSV/Excel/Google Sheet snapshots | Project-wide, all runs | Read-only |
/upstream/ | Intermediate outputs from earlier pipeline steps | This run, this branch | Read-only after upstream model writes |
/data/ is for external inputs. It holds immutable Parquet snapshots
produced by upload_data or connect_google_sheet. Every model run —
whether single-model or project-wide — can read from /data/. The data
inside does not change during a run; freshness is controlled by when you
call connect_google_sheet or upload a new file.
/upstream/ is the pipeline bus. It exists only during a
run(scope='project') call and only when PIPELINE is defined in run.py. As
each model in the pipeline completes, its runtime output dict (a
module-level result dict, or a legacy outputs dict) is serialised to
/upstream/<model_name>/outputs.json so the next model in the list can
read it. /upstream/ transport requires the model to define a runtime
output dict — models that write directly to /outputs/ without defining
result (or a legacy outputs dict) execute normally but are not
materialised to /upstream/ and cannot be consumed by downstream models
via this path. When the run finishes, /upstream/ is discarded — it is
not persisted to S3 and is not visible between runs or branches.
Enabling the pipeline
Section titled “Enabling the pipeline”In run.py, define a top-level PIPELINE list naming your models in
execution order:
PIPELINE = ["revenue", "expenses", "summary"]When run(scope='project') executes run.py, it detects PIPELINE and:
- Runs
model/revenue.py; writes itsresultdict (or legacyoutputsdict) to/upstream/revenue/outputs.json. - Runs
model/expenses.py; writes itsresultdict (or legacyoutputsdict) to/upstream/expenses/outputs.json. - Runs
model/summary.py.
Each /upstream/ write only occurs when the model defines a runtime
output dict — either a module-level result dict (preferred, pairs with
the outputs = [...] contract metadata convention) or a legacy
module-level outputs dict. A model that writes directly to /outputs/
without defining either runs and its files appear in run(scope='project')
results, but it is not materialised to /upstream/ for downstream models.
Models not in PIPELINE are skipped. If a model listed in PIPELINE
does not exist, it is skipped with a warning on stderr and execution
continues.
Without PIPELINE, the scaffold auto-discovers all model/*.py files
and runs them in alphabetical order (original behavior; no upstream
materialisation).
The /upstream first, /data fallback pattern
Section titled “The /upstream first, /data fallback pattern”Downstream models should prefer /upstream/ when available and fall back
to a /data/ snapshot when running outside a pipeline context (for
example, during a single-model run(scope='model') call or early development
before all pipeline stages exist):
"""Combine revenue and expense outputs into an executive summary."""import jsonimport pathlibimport pandas as pd
MONTHS = 12
# --- revenue inputs ---_upstream_rev = pathlib.Path("/upstream/revenue/outputs.json")_data_rev = pathlib.Path("/data/revenue_actuals") # Parquet snapshot fallback
if _upstream_rev.exists(): rev_data = json.loads(_upstream_rev.read_text()) total_revenue = sum(rev_data.get("monthly_revenue", [0] * MONTHS))else: # Standalone run: read the Google Sheet snapshot instead. df = pd.read_parquet(str(_data_rev)) total_revenue = float(df["revenue"].sum())
# --- expense inputs ---_upstream_exp = pathlib.Path("/upstream/expenses/outputs.json")_data_exp = pathlib.Path("/data/expense_actuals")
if _upstream_exp.exists(): exp_data = json.loads(_upstream_exp.read_text()) total_expenses = sum(exp_data.get("monthly_expenses", [0] * MONTHS))else: df = pd.read_parquet(str(_data_exp)) total_expenses = float(df["expenses"].sum())
# --- summary ---inputs = ["monthly_revenue", "monthly_expenses"]outputs = ["total_revenue", "total_expenses", "net_income"]dependencies = ["revenue", "expenses"]
result = { "total_revenue": round(total_revenue, 2), "total_expenses": round(total_expenses, 2), "net_income": round(total_revenue - total_expenses, 2),}The fallback branch makes summary.py runnable as a standalone model
(useful during development), while the primary branch composes the full
pipeline result when run via run(scope='project').
Complete three-model example
Section titled “Complete three-model example”Here is a minimal but complete project showing the revenue → expenses → summary chain.
run.py (scaffold with PIPELINE defined):
"""Bridge Town model entry point — auto-discovery scaffold.
# BT-SCAFFOLD-SENTINEL"""
PIPELINE = ["revenue", "expenses", "summary"]
# (The rest of this file is the standard Bridge Town scaffold.)model/revenue.py:
"""12-month SaaS revenue projection."""MONTHS = 12BASE_ARR = 1_200_000GROWTH_RATE = 0.07
monthly = []arr = BASE_ARRfor _ in range(MONTHS): monthly.append(round(arr / 12, 2)) arr *= 1 + GROWTH_RATE
inputs = ["base_arr", "growth_rate"]outputs = ["monthly_revenue", "arr_eoy"]
dependencies = []
# Legacy runtime output pattern (dict) — still supported for execution.result = {"monthly_revenue": monthly, "arr_eoy": round(arr, 2)}model/expenses.py:
"""Monthly expense model — headcount + opex."""import json, pathlib
_upstream = pathlib.Path("/upstream/revenue/outputs.json")if _upstream.exists(): rev = json.loads(_upstream.read_text()) # Size opex as a percentage of revenue. monthly_revenue = rev.get("monthly_revenue", [100_000] * 12)else: monthly_revenue = [100_000] * 12
HEADCOUNT_COST = 80_000 # per monthOPEX_PCT_OF_REVENUE = 0.18
monthly = [ round(HEADCOUNT_COST + r * OPEX_PCT_OF_REVENUE, 2) for r in monthly_revenue]
inputs = ["monthly_revenue"]outputs = ["monthly_expenses"]dependencies = ["revenue"]
result = {"monthly_expenses": monthly}model/summary.py: (see /upstream first, /data fallback example above)
Running run(scope='project') on this project produces combined output keyed by
model name and triggers downstream scenario analysis naturally via
compare_branches.
Scenario analysis with pipelines
Section titled “Scenario analysis with pipelines”When you run scenario analysis (run_scenario_analysis), use project
runs — not Google Sheets — as the comparison mechanism:
- Create a scenario branch:
create_branchwithbranch_name="scenario/upside". - Edit the assumption in the relevant model on that branch:
patch_file. - Run the full pipeline:
run(scope='project')withbranch="scenario/upside". - Compare outputs:
compare_brancheswithbase_branch="main"andscenario_branch="scenario/upside".
Because /upstream/ materialises all intermediate model outputs during
each run, compare_branches can diff the full pipeline — not just the
model you edited. Changes propagate automatically through the dependency
chain.
Do not export intermediate results to Google Sheets and import them
back on a different branch to compare scenarios. This breaks the
branch-scoped isolation that compare_branches depends on.
Authoring checklist
Section titled “Authoring checklist”When writing a downstream model that reads from /upstream/:
- Use
pathlib.Path("/upstream/<model_name>/outputs.json").exists()to guard the read — never assume the file is there. - Provide a
/data/or constant fallback so the model is runnable standalone. - Keep the upstream read near the top of the file, before any computation.
- Do not write back to
/upstream/— it is managed byrun.py.
Related
Section titled “Related”- Projects & Models — PIPELINE syntax reference and model lifecycle
- Sandbox Execution — mounted paths and security constraints
- Data Sources & Snapshots —
/data/mount and Google Sheets integration - Google Sheets Integration — connecting sheets as external inputs and writing results back