-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoperations.py
More file actions
25 lines (17 loc) · 735 Bytes
/
operations.py
File metadata and controls
25 lines (17 loc) · 735 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from typing import Awaitable, Callable, Generic, Optional, Type, TypeAlias, TypeVar
from pydantic import BaseModel, ConfigDict
_T = TypeVar("_T")
ResponseT = TypeVar("ResponseT")
InputT = TypeVar("InputT")
EntryT = TypeVar("EntryT")
AsyncCallable: TypeAlias = Callable[[], Awaitable[_T]]
class APIOperation(BaseModel, Generic[ResponseT, InputT]):
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
method: str
endpoint_template: str
response_model: Type[ResponseT]
input_model: Optional[Type[InputT]] = None
class StreamOperation(BaseModel, Generic[EntryT]):
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
endpoint_template: str
entry_model: Type[EntryT]