Skip to content

Add @Flow.model functional API#206

Open
NeejWeej wants to merge 1 commit intoPoint72:mainfrom
NeejWeej:nk/auto_deps_auto_callable_model
Open

Add @Flow.model functional API#206
NeejWeej wants to merge 1 commit intoPoint72:mainfrom
NeejWeej:nk/auto_deps_auto_callable_model

Conversation

@NeejWeej
Copy link
Copy Markdown

@NeejWeej NeejWeej commented May 4, 2026

PR Summary: @Flow.model

Replaces #171. Reopened from a personal fork.

This PR adds @Flow.model, an authoring API that turns a typed Python function into a real CallableModel factory.

The intent is to make common DAG stages easier to write while keeping execution inside the existing ccflow machinery. Generated models still use the existing CallableModel, evaluator, cache, dependency graph, registry, Hydra, and serialization paths.

Core API

@Flow.model splits function parameters into two categories:

  • Regular parameters: ordinary unmarked parameters. These are construction-time model inputs and may be literals, defaults, or upstream CallableModel dependencies.
  • Contextual parameters: parameters marked with FromContext[T]. These are runtime inputs supplied by context, .flow.compute(...), construction-time contextual defaults, or .flow.with_context(...).

Example:

from ccflow import Flow, FromContext


@Flow.model
def add(a: int, b: FromContext[int]) -> int:
    return a + b


model = add(a=10)
assert model.flow.compute(b=5).value == 15

When a function returns a non-ResultBase value, the generated model wraps it in GenericResult[value]. Explicit ResultBase returns are preserved.

Dependency Wiring

Regular parameters can be bound to upstream models:

@Flow.model
def source(value: FromContext[int]) -> int:
    return value * 10


@Flow.model
def root(x: int, bonus: FromContext[int]) -> int:
    return x + bonus


model = root(x=source())
assert model.flow.compute(value=3, bonus=7).value == 37

Generated __deps__ methods expose non-lazy upstream model dependencies to the existing graph evaluator. Lazy[T] remains supported for dependency thunks when a dependency should only be evaluated if user code calls it.

Context Rewrites

This PR adds .flow.with_context(...) plus @Flow.context_transform.

with_context rewrites runtime context for one dependency edge without mutating the wrapped model. This supports fanout patterns where the same model is evaluated against different contextual inputs in different branches.

@Flow.context_transform
def previous_day(day: FromContext[int]) -> int:
    return day - 1


previous = source().flow.with_context(value=previous_day())

Raw callables are intentionally rejected in with_context; reusable transforms should be defined with @Flow.context_transform so they can be validated and serialized.

Execution Helpers

Every CallableModel now exposes model.flow.

For generated models, model.flow provides:

  • compute(...): ergonomic execution from a context object or contextual kwargs.
  • with_context(...): edge-local context rewrites.
  • context_inputs: contextual fields the model may consume.
  • bound_inputs: construction-time fields and static context bindings.
  • unbound_inputs: required contextual fields not yet satisfied.

compute() deliberately does not bind regular parameters. If a kwarg matches a regular parameter, it raises instead of silently treating runtime context as model configuration.

The PR also adds Flow.call(auto_context=...) as a narrow opt-in for hand-written CallableModel.__call__ methods that want to declare context fields as keyword-only parameters. It is not the main @Flow.model authoring path and does not add FromContext[...], dependency wiring, or .flow.with_context(...) semantics by itself.

Serialization

Importable module-level @Flow.model functions produce generated classes with stable module import paths, so JSON/config-style round trips can work across processes when the defining module is importable.

Only importable module-level @Flow.model functions are durable across JSON/config-style round trips. Local, nested, and __main__ definitions are best-effort for pickle/cloudpickle object transport, not stable config artifacts.

Cache And Graph Identity

Public cache_key(...) remains structural by default.

Generated and bound models also support effective identity for model evaluations. Effective identity describes the parts of an invocation that actually affect the result, so unused ambient FlowContext fields do not split built-in cache entries or graph nodes.

The built-in MemoryCacheEvaluator now uses:

cache_key(context, effective=True)

Custom evaluators can use the same public API if they want generated-model-aware keys:

from ccflow.evaluators import cache_key


key = cache_key(model_evaluation_context, effective=True)

The default remains structural:

cache_key(model_evaluation_context)

Ordinary CallableModel classes continue to use structural identity unless they explicitly opt into the internal identity hook. This is intentional: arbitrary handwritten CallableModel.__call__ implementations can inspect context in ways ccflow cannot infer safely.

Opaque evaluators also use structural identity, since they could access arbitrary fields on the context that differ from the signature of a given @Flow.model decorated function.

Why Effective Identity Matters

The existing structural key can over-split cache entries for ordinary CallableModels when callers pass a richer context than the model semantically uses. With structural context identity, adding or changing an ambient field for one branch of a DAG can invalidate cache reuse in another branch that does not use that field.

Minimal ordinary-CallableModel example:

from datetime import date

from ccflow import CallableModel, ContextBase, Flow, FlowOptionsOverride, GenericResult
from ccflow.evaluators import MemoryCacheEvaluator


class DayContext(ContextBase):
    day: date


class DayRequestContext(DayContext):
    request_id: str


calls = {"count": 0}


class DayName(CallableModel):
    @Flow.call
    def __call__(self, context: DayContext) -> GenericResult[str]:
        calls["count"] += 1
        return GenericResult(value=context.day.strftime("%A"))


model = DayName()
cache = MemoryCacheEvaluator()

ctx1 = DayRequestContext(day=date(2024, 1, 1), request_id="a")
ctx2 = DayRequestContext(day=date(2024, 1, 1), request_id="b")

# Both calls produce the same answer, and DayName only uses `day`.
# But the structural cache key includes the richer runtime context, so
# different request_id values create different cache entries.
with FlowOptionsOverride(options={"evaluator": cache, "cacheable": True}):
    assert model(ctx1).value == "Monday"
    assert model(ctx2).value == "Monday"

assert calls["count"] == 2

Handwritten CallableModels can opt into effective identity by overriding the internal identity hook and returning only the semantic fields that affect the result:

class DayName(CallableModel):
    @Flow.call
    def __call__(self, context: DayContext) -> GenericResult[str]:
        calls["count"] += 1
        return GenericResult(value=context.day.strftime("%A"))

    def _evaluation_identity_payload(self, context):
        return {
            "kind": "day_name_v1",
            "day": context.day,
        }

With that opt-in, the built-in cache can reuse results across DayRequestContext(day=..., request_id="a") and DayRequestContext(day=..., request_id="b") because the model has explicitly declared that only day affects the result.

This is not the default for handwritten CallableModels because ccflow cannot safely infer what arbitrary Python code uses. A normal __call__ implementation might inspect type(context), call context.model_dump(), read subclass-only fields, or otherwise depend on the full runtime context object. Automatically projecting context for every handwritten model would risk incorrect cache hits for existing users.

@Flow.model improves this case because consumed contextual inputs are explicit via FromContext[...], so generated models can safely ignore unused ambient fields in effective cache/graph identity:

from datetime import date

from ccflow import Flow, FlowOptionsOverride, FromContext, FlowContext
from ccflow.evaluators import MemoryCacheEvaluator


calls = {"count": 0}


@Flow.model
def day_name(day: FromContext[date]) -> str:
    calls["count"] += 1
    return day.strftime("%A")


model = day_name()
cache = MemoryCacheEvaluator()

ctx1 = FlowContext(day=date(2024, 1, 1), request_id="a")
ctx2 = FlowContext(day=date(2024, 1, 1), request_id="b")

# request_id is ambient context. The generated model declares that it only
# consumes `day`, so the built-in cache can reuse the first result.
with FlowOptionsOverride(options={"evaluator": cache, "cacheable": True}):
    assert model(ctx1).value == "Monday"
    assert model(ctx2).value == "Monday"

assert calls["count"] == 1

Compatibility

The PR is additive:

  • Existing CallableModel implementations continue to work.
  • Existing Flow.call behavior is preserved.
  • cache_key(...) remains structural unless effective=True is explicitly requested.
  • Plain CallableModel cache keys and graph keys remain structural.
  • FlowContext is an open runtime carrier for generated models, but declared context_type=... can still be used to validate FromContext[...] fields against an existing nominal context.

Test Coverage

The test suite covers:

  • generated model execution and validation,
  • contextual defaults and runtime precedence,
  • dependency wiring and lazy dependencies,
  • with_context field and patch transforms,
  • declared context type validation,
  • registry and Hydra-style construction,
  • pickle/cloudpickle and cross-process serialization,
  • stable import-path JSON round trips for importable generated models,
  • cache_key(..., effective=True) behavior,
  • dependency graph integration,
  • ordinary CallableModel compatibility.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 4, 2026

Codecov Report

❌ Patch coverage is 96.85415% with 66 lines in your changes missing coverage. Please review.
✅ Project coverage is 95.41%. Comparing base (45ff204) to head (a074f8f).

Files with missing lines Patch % Lines
ccflow/flow_model.py 97.61% 14 Missing and 11 partials ⚠️
ccflow/tests/test_callable.py 90.56% 15 Missing ⚠️
ccflow/_flow_model_binding.py 96.04% 7 Missing and 6 partials ⚠️
ccflow/tests/test_flow_context.py 98.21% 4 Missing ⚠️
ccflow/context.py 91.89% 1 Missing and 2 partials ⚠️
ccflow/evaluators/common.py 96.77% 3 Missing ⚠️
ccflow/callable.py 93.33% 1 Missing and 1 partial ⚠️
ccflow/tests/evaluators/test_common.py 98.41% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #206      +/-   ##
==========================================
+ Coverage   95.22%   95.41%   +0.19%     
==========================================
  Files         140      145       +5     
  Lines       10354    14430    +4076     
  Branches      599      913     +314     
==========================================
+ Hits         9860    13769    +3909     
- Misses        369      516     +147     
- Partials      125      145      +20     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch 4 times, most recently from dddfb5b to 9f1755c Compare May 4, 2026 08:22
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch from 9f1755c to a074f8f Compare May 4, 2026 08:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant