-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstructured_types.py
More file actions
136 lines (100 loc) · 3.59 KB
/
structured_types.py
File metadata and controls
136 lines (100 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Using dataclasses as RPC parameters and return types.
``ArrowSerializableDataclass`` lets you pass structured data across the
RPC boundary. Fields are automatically mapped to Arrow types.
Run::
python examples/structured_types.py
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Protocol
from vgi_rpc import serve_pipe
from vgi_rpc.utils import ArrowSerializableDataclass
# ---------------------------------------------------------------------------
# Domain types
# ---------------------------------------------------------------------------
class Priority(Enum):
"""Task priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
@dataclass(frozen=True)
class Task(ArrowSerializableDataclass):
"""A task with structured fields.
Supported field types include: str, int, float, bool, Enum,
list[T], dict[K, V], frozenset[T], Optional types, and nested
ArrowSerializableDataclass instances.
"""
title: str
priority: Priority
tags: list[str]
metadata: dict[str, str]
done: bool = False
@dataclass(frozen=True)
class TaskSummary(ArrowSerializableDataclass):
"""Summary returned by the task service."""
total: int
high_priority: int
titles: list[str]
# ---------------------------------------------------------------------------
# Service
# ---------------------------------------------------------------------------
class TaskService(Protocol):
"""Service that accepts and returns dataclass parameters."""
def create_task(self, task: Task) -> str:
"""Store a task and return its ID."""
...
def summarize(self) -> TaskSummary:
"""Return a summary of all stored tasks."""
...
class TaskServiceImpl:
"""In-memory task store."""
def __init__(self) -> None:
"""Initialize the store."""
self._tasks: list[Task] = []
self._next_id: int = 0
def create_task(self, task: Task) -> str:
"""Store a task and return its ID."""
self._tasks.append(task)
task_id = f"TASK-{self._next_id}"
self._next_id += 1
return task_id
def summarize(self) -> TaskSummary:
"""Return a summary of all stored tasks."""
return TaskSummary(
total=len(self._tasks),
high_priority=sum(1 for t in self._tasks if t.priority == Priority.HIGH),
titles=[t.title for t in self._tasks],
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
"""Run the dataclass example."""
with serve_pipe(TaskService, TaskServiceImpl()) as svc:
# Create some tasks using structured dataclass parameters
id1 = svc.create_task(
task=Task(
title="Write documentation",
priority=Priority.HIGH,
tags=["docs", "urgent"],
metadata={"assignee": "alice"},
)
)
print(f"Created: {id1}")
id2 = svc.create_task(
task=Task(
title="Run benchmarks",
priority=Priority.LOW,
tags=["perf"],
metadata={"env": "staging"},
)
)
print(f"Created: {id2}")
# Get a structured summary back
summary = svc.summarize()
print(f"\nTotal tasks: {summary.total}")
print(f"High priority: {summary.high_priority}")
print(f"Titles: {summary.titles}")
if __name__ == "__main__":
main()