Skip to content

Commit f284527

Browse files
committed
first commit
0 parents  commit f284527

5 files changed

Lines changed: 235 additions & 0 deletions

File tree

.github/workflows/deploy.yml

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
name: Deploy Project
2+
on:
3+
push:
4+
branches:
5+
- main
6+
pull_request:
7+
branches:
8+
- main
9+
10+
env:
11+
GH_HEAD_REF: ${{ github.head_ref }}
12+
GH_REF: ${{ github.ref_name }}
13+
14+
permissions:
15+
id-token: write
16+
contents: read
17+
pull-requests: write
18+
19+
jobs:
20+
deploy:
21+
name: Deploy Project
22+
runs-on: ubuntu-latest
23+
24+
steps:
25+
- uses: actions/checkout@v4
26+
with:
27+
ref: ${{ github.event.pull_request.head.sha }}
28+
fetch-depth: 0
29+
30+
- name: Set up Python
31+
uses: actions/setup-python@v1
32+
with:
33+
python-version: 3.12
34+
35+
- name: Install dependencies
36+
run: |
37+
python3 -m pip install -U requests
38+
python3 -m pip install outerbounds pyyaml
39+
python3 -m pip install -U ob-project-utils
40+
- name: Configure Outerbounds
41+
run: |
42+
PROJECT_NAME=$(yq .project obproject.toml)
43+
DEFAULT_CICD_USER="${PROJECT_NAME//_/-}-cicd"
44+
PLATFORM=$(yq .platform obproject.toml)
45+
CICD_USER=$(yq ".cicd_user // \"$DEFAULT_CICD_USER\"")
46+
PERIMETER="default"
47+
echo "🏗️ Deployment target:"
48+
echo " Platform: $PLATFORM"
49+
echo " CI/CD User: $CICD_USER"
50+
echo " Perimeter: $PERIMETER"
51+
outerbounds service-principal-configure \
52+
--name $CICD_USER \
53+
--deployment-domain $PLATFORM \
54+
--perimeter $PERIMETER \
55+
--github-actions
56+
57+
- name: Deploy Project
58+
env:
59+
COMMIT_URL: "https://github.com/${{ github.repository }}/commit/"
60+
CI_URL: "https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}"
61+
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
62+
COMMENTS_URL: ${{ github.event.pull_request.comments_url }}
63+
PYTHONUNBUFFERED: 1
64+
run: obproject-deploy

README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Flow Chaining Example
2+
3+
Minimal example showing how to chain flows using `@trigger_on_finish`.
4+
5+
## Structure
6+
7+
```
8+
flow-chaining-example/
9+
obproject.toml
10+
flows/
11+
preprocess/flow.py # Runs first, processes datasets in parallel
12+
train/flow.py # Triggered when preprocess finishes
13+
```
14+
15+
## How It Works
16+
17+
1. **PreprocessFlow** runs with a `datasets` parameter (comma-separated paths)
18+
2. Uses `foreach` to process each dataset in parallel
19+
3. Stores results in `self.processed_paths` artifact
20+
4. **TrainFlow** has `@trigger_on_finish(flow="PreprocessFlow")`
21+
5. When PreprocessFlow completes, TrainFlow starts automatically
22+
6. TrainFlow accesses data via `current.trigger.run.data.processed_paths`
23+
24+
## Testing Locally
25+
26+
```bash
27+
# Test PreprocessFlow standalone
28+
cd flows/preprocess
29+
python flow.py run --datasets "path1,path2,path3"
30+
31+
# Test TrainFlow standalone (without trigger)
32+
cd flows/train
33+
python flow.py run --learning_rate 0.05 --n_estimators 200
34+
```
35+
36+
## Deploy to Outerbounds
37+
38+
```bash
39+
obproject-deploy
40+
```
41+
42+
After deploy:
43+
1. Run PreprocessFlow from UI or CLI
44+
2. TrainFlow will trigger automatically when it finishes
45+
3. TrainFlow parameters (learning_rate, n_estimators) use deploy-time defaults
46+
47+
## Passing Parameters at Runtime
48+
49+
TrainFlow parameters are set at **deploy time** via the flow definition defaults.
50+
To change them per-run, either:
51+
52+
1. **Redeploy** with different defaults
53+
2. **Use artifacts** instead of Parameters for runtime values:
54+
- PreprocessFlow stores config in an artifact
55+
- TrainFlow reads it via `current.trigger.run.data.config`
56+
57+
## Key Pattern
58+
59+
```python
60+
# In TrainFlow
61+
@trigger_on_finish(flow="PreprocessFlow")
62+
class TrainFlow(ProjectFlow):
63+
64+
@step
65+
def start(self):
66+
if current.trigger:
67+
# Access parent flow's artifacts
68+
data = current.trigger.run.data.processed_paths
69+
```

flows/preprocess/flow.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""
2+
Preprocess Flow: Processes multiple datasets in parallel.
3+
When finished, automatically triggers TrainFlow.
4+
"""
5+
6+
from metaflow import step, Parameter, current
7+
from obproject import ProjectFlow
8+
9+
10+
class PreprocessFlow(ProjectFlow):
11+
"""
12+
Preprocesses datasets. Runs multiple configs in parallel via foreach.
13+
"""
14+
15+
datasets = Parameter(
16+
"datasets",
17+
default="s3://bucket/data1,s3://bucket/data2",
18+
help="Comma-separated dataset paths",
19+
)
20+
21+
@step
22+
def start(self):
23+
self.dataset_list = self.datasets.split(",")
24+
print(f"Processing {len(self.dataset_list)} datasets: {self.dataset_list}")
25+
self.next(self.preprocess, foreach="dataset_list")
26+
27+
@step
28+
def preprocess(self):
29+
"""Process each dataset (A1, A2, etc.)"""
30+
self.dataset_path = self.input
31+
self.output_path = f"{self.dataset_path}_processed"
32+
print(f"Preprocessed: {self.dataset_path} -> {self.output_path}")
33+
self.next(self.join)
34+
35+
@step
36+
def join(self, inputs):
37+
"""Merge results from parallel preprocessing"""
38+
self.processed_paths = [inp.output_path for inp in inputs]
39+
print(f"All preprocessed: {self.processed_paths}")
40+
self.next(self.end)
41+
42+
@step
43+
def end(self):
44+
# These artifacts are accessible to TrainFlow via current.trigger.run.data
45+
print(f"Preprocess complete. Outputs: {self.processed_paths}")
46+
47+
48+
if __name__ == "__main__":
49+
PreprocessFlow()

flows/train/flow.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""
2+
Train Flow: Triggered automatically when PreprocessFlow finishes.
3+
Accesses PreprocessFlow's outputs via current.trigger.run.data
4+
"""
5+
6+
from metaflow import step, Parameter, current, trigger_on_finish
7+
from obproject import ProjectFlow
8+
9+
10+
@trigger_on_finish(flow="PreprocessFlow")
11+
class TrainFlow(ProjectFlow):
12+
"""
13+
Training flow. Triggered automatically when PreprocessFlow completes.
14+
"""
15+
16+
learning_rate = Parameter("learning_rate", default=0.01, type=float)
17+
n_estimators = Parameter("n_estimators", default=100, type=int)
18+
19+
@step
20+
def start(self):
21+
# Access data from the triggering flow
22+
if current.trigger:
23+
trigger_run = current.trigger.run
24+
self.input_paths = trigger_run.data.processed_paths
25+
print(f"Triggered by: {trigger_run.pathspec}")
26+
print(f"Input paths from PreprocessFlow: {self.input_paths}")
27+
else:
28+
# Standalone run - use defaults for testing
29+
self.input_paths = ["test/path1_processed", "test/path2_processed"]
30+
print(f"Standalone run. Using test paths: {self.input_paths}")
31+
32+
self.next(self.train)
33+
34+
@step
35+
def train(self):
36+
"""Train model using preprocessed data"""
37+
print(f"Training with lr={self.learning_rate}, n_estimators={self.n_estimators}")
38+
print(f"Using data: {self.input_paths}")
39+
self.model_path = "s3://bucket/models/trained_model"
40+
self.metrics = {"accuracy": 0.95, "f1": 0.93}
41+
self.next(self.end)
42+
43+
@step
44+
def end(self):
45+
print(f"Training complete. Model: {self.model_path}")
46+
print(f"Metrics: {self.metrics}")
47+
48+
49+
if __name__ == "__main__":
50+
TrainFlow()

obproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
project = "flow-chaining-demo"
2+
title = "Flow Chaining Example"
3+
platform = "dev-yellow.outerbounds.xyz"

0 commit comments

Comments
 (0)