Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions cq/_core/pipetools.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from typing import TYPE_CHECKING, Any, Callable, overload

import injection

from cq import Dispatcher
from cq._core.dispatcher.lazy import LazyDispatcher
from cq._core.dispatcher.pipe import ContextPipeline
from cq._core.message import AnyCommandBus, Command
from cq._core.dispatcher.pipe import ContextPipeline, PipeConverterMethod
from cq._core.message import AnyCommandBus, Command, Query, QueryBus
from cq._core.scope import CQScope
from cq.middlewares.scope import InjectionScopeMiddleware


class ContextCommandPipeline[I: Command](ContextPipeline[I]):
__slots__ = ()
__slots__ = ("__query_dispatcher",)

__query_dispatcher: Dispatcher[Query, Any]

def __init__(
self,
Expand All @@ -17,15 +22,45 @@ def __init__(
injection_module: injection.Module | None = None,
threadsafe: bool | None = None,
) -> None:
dispatcher = LazyDispatcher(
command_dispatcher = LazyDispatcher(
AnyCommandBus,
injection_module=injection_module,
threadsafe=threadsafe,
)
super().__init__(dispatcher)
super().__init__(command_dispatcher)

self.__query_dispatcher = LazyDispatcher(
QueryBus,
injection_module=injection_module,
threadsafe=threadsafe,
)

transaction_scope_middleware = InjectionScopeMiddleware(
CQScope.TRANSACTION,
exist_ok=True,
threadsafe=threadsafe,
)
self.add_middlewares(transaction_scope_middleware)

if TYPE_CHECKING: # pragma: no cover

@overload
def query_step[T: Query](
self,
wrapped: PipeConverterMethod[T, Any],
/,
) -> PipeConverterMethod[T, Any]: ...

@overload
def query_step[T: Query](
self,
wrapped: None = ...,
/,
) -> Callable[[PipeConverterMethod[T, Any]], PipeConverterMethod[T, Any]]: ...

def query_step[T: Query](
self,
wrapped: PipeConverterMethod[T, Any] | None = None,
/,
) -> Any:
return self.step(wrapped, dispatcher=self.__query_dispatcher)
6 changes: 4 additions & 2 deletions docs/guides/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ In CQRS, the saga pattern is typically used to orchestrate multiple commands. Ho
A pipeline executes a sequence of commands, where each step transforms the result of the previous command into the next command.

```python
from cq import ContextCommandPipeline, ContextPipeline
from cq import ContextCommandPipeline

class PaymentContext:
transaction_id: int

pipeline: ContextPipeline[ValidateCartCommand] = ContextCommandPipeline()
pipeline: ContextCommandPipeline[ValidateCartCommand] = ContextCommandPipeline()

@pipeline.step
async def _(self, result: CartValidatedResult) -> CreateTransactionCommand:
Expand All @@ -36,6 +36,8 @@ The pipeline class acts as a context, allowing you to store intermediate values

Each step is a method decorated with `@pipeline.step`. It receives the result of the previous command handler and returns the next command to dispatch.

You can also use `@pipeline.query_step` to dispatch queries instead of commands.

The last step is optional. If defined, it must return `None`.

### Dispatching a pipeline
Expand Down
18 changes: 9 additions & 9 deletions tests/test_context_command_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from cq import ContextCommandPipeline, ContextPipeline, command_handler
from cq import ContextCommandPipeline, command_handler, query_handler
from tests.helpers.history import HistoryMiddleware


Expand All @@ -11,7 +11,7 @@ class Command1: ...

class Command2: ...

class Command3: ...
class Query: ...

class Foo: ...

Expand All @@ -29,27 +29,27 @@ class CommandHandler2:
async def handle(self, command: Command2) -> Bar:
return Bar()

@command_handler
class CommandHandler3:
async def handle(self, command: Command3) -> Baz:
@query_handler
class QueryHandler:
async def handle(self, query: Query) -> Baz:
return Baz()

class Context:
foo: Foo
bar: Bar
baz: Baz

pipeline: ContextPipeline[Command1] = ContextCommandPipeline()
pipeline: ContextCommandPipeline[Command1] = ContextCommandPipeline()

@pipeline.step
async def _(self, foo: Foo) -> Command2:
self.foo = foo
return Command2()

@pipeline.step
async def _(self, bar: Bar) -> Command3:
@pipeline.query_step
async def _(self, bar: Bar) -> Query:
self.bar = bar
return Command3()
return Query()

@pipeline.step
async def _(self, baz: Baz) -> None:
Expand Down
Loading
Loading