Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,47 @@ The application will be available at `http://localhost:8000`

For the full documentation site, run `mkdocs serve` and visit http://localhost:8000

## Demo: full run lifecycle

End-to-end demo of a complete run lifecycle driven by MQTT telemetry from the simulator.

```bash
# Terminal 1 — start the full databus stack
cd databus && bash scripts/dev.sh

# Terminal 2 — load GTFS feed
docker compose -f compose.dev.yml exec orchestrator \
uv run python manage.py loaddata gtfs.json

# Terminal 3 — start the simulator (wired to databus broker)
# The simulator's scheduler posts to /api/create-run on each schedule entry's
# start_time. The UI's Operator tab handles confirmation. No databus-side
# bootstrap command is required.
cd ../simulator && docker compose up simulator web

# Terminal 4 — observe (optional)
open http://localhost:8080 # live map
watch ls backend/feed/files/ # GTFS-RT outputs (refresh every 15 s)
```

Within ~30 s of starting the simulator:

- Every run advances `CONFIRMED → TRACKING → IN_PROGRESS`
- `backend/feed/files/vehicle_positions.pb` contains one `FeedEntity` per active run
- `backend/feed/files/trip_updates.pb` contains stop-time predictions

Killing the simulator triggers `RUN_TRACKING_LOST` after 60 s and
`RUN_TRACKING_EXPIRED → CANCELLED` after 300 s.

Verify the protobuf output:

```python
from google.transit import gtfs_realtime_pb2
msg = gtfs_realtime_pb2.FeedMessage()
msg.ParseFromString(open("backend/feed/files/vehicle_positions.pb", "rb").read())
print(len(msg.entity)) # should equal the number of active runs
```

## 🛣️ Roadmap

Where is this going? Check SIMOVI's [roadmap](https://github.com/simovilab/context/blob/main/roadmap.md).
Expand Down
30 changes: 19 additions & 11 deletions backend/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from runs.models import (
Run,
Position,
Progression,
Occupancy,
VehicleStopStatus,
CongestionLevel,
OccupancyStatus,
)
from runs.domain.events import RunLifecycleEvents
from runs.domain.lifecycle import RunLifecycleEvents
from feed.models import *
from django.contrib.auth.models import User
from rest_framework import serializers
Expand Down Expand Up @@ -130,10 +131,9 @@ class CreateRunSerializer(serializers.Serializer):
)


class UpdateRunSerializer(serializers.Serializer):
run_id = serializers.CharField(max_length=100)
class RunUpdateSerializer(serializers.Serializer):
event = serializers.ChoiceField(choices=RunLifecycleEvents)
details = serializers.JSONField()
details = serializers.JSONField(required=False, default=dict)


class PositionSerializer(serializers.HyperlinkedModelSerializer):
Expand Down Expand Up @@ -175,23 +175,31 @@ def get_longitude(self, obj):
# return Position.objects.create(point=point, **validated_data)


class ProgressionSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())
class VehicleStopStatusSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())

class Meta:
model = Progression
model = VehicleStopStatus
fields = "__all__"
fields = "__all__"
ordering = ["id"]


class OccupancySerializer(serializers.HyperlinkedModelSerializer):
class CongestionLevelSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())

class Meta:
model = CongestionLevel
fields = "__all__"
fields = "__all__"
ordering = ["id"]


class OccupancyStatusSerializer(serializers.HyperlinkedModelSerializer):
vehicle = serializers.PrimaryKeyRelatedField(queryset=Vehicle.objects.all())

class Meta:
model = Occupancy
model = OccupancyStatus
fields = "__all__"
fields = "__all__"
ordering = ["id"]
Expand Down
17 changes: 15 additions & 2 deletions backend/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
router.register(r"equipment-log", views.EquipmentLogViewSet)
# router.register(r"run", views.RunViewSet)
router.register(r"position", views.PositionViewSet)
router.register(r"progression", views.ProgressionViewSet)
router.register(r"stop-status", views.VehicleStopStatusViewSet)
router.register(r"occupancy", views.OccupancyViewSet)
router.register(r"congestion", views.CongestionLevelViewSet)
# GTFS Schedule
router.register(r"agency", views.AgencyViewSet)
router.register(r"stops", views.StopViewSet)
Expand All @@ -39,7 +40,19 @@
path("login/", views.LoginView.as_view(), name="login"),
# path("route-stops/", views.RouteStopView.as_view(), name="route_stops"),
path("create-run/", views.CreateRunViewSet.as_view(), name="create_run"),
path("update-run/", views.UpdateRunViewSet.as_view(), name="update_run"),
path(
"runs/<uuid:run_id>/state/", views.RunStateViewSet.as_view(), name="run_state"
),
path(
"runs/<uuid:run_id>/update/",
views.RunUpdateViewSet.as_view(),
name="run_update",
),
path(
"runs/<uuid:run_id>/history/",
views.RunHistoryView.as_view(),
name="run_history",
),
path("service-today/", views.ServiceTodayView.as_view(), name="service_today"),
path("which-shapes/", views.WhichShapesView.as_view(), name="which_shapes"),
path("find-trips/", views.FindTripsView.as_view(), name="find_trips"),
Expand Down
139 changes: 114 additions & 25 deletions backend/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from realtime_engine.tasks import run_lifecycle_event
from runs.services.exceptions import RunLifecycleError
from runs.services.lifecycle import RunLifecycleService
from runs.domain.events import RunLifecycleEvents
from runs.domain.states import RunLifecycleStates
from runs.domain.lifecycle import RunLifecycleEvents
from runs.domain.lifecycle import RunLifecycleStates
from operations.models import (
Vehicle,
Operator,
Expand All @@ -26,9 +26,11 @@
)
from runs.models import (
Run,
RunLifecycleTransition,
Position,
Progression,
Occupancy,
VehicleStopStatus,
CongestionLevel,
OccupancyStatus,
)
from feed.models import (
Feed,
Expand All @@ -55,10 +57,11 @@
EquipmentLogSerializer,
OperatorSerializer,
CreateRunSerializer,
UpdateRunSerializer,
RunUpdateSerializer,
PositionSerializer,
ProgressionSerializer,
OccupancySerializer,
VehicleStopStatusSerializer,
CongestionLevelSerializer,
OccupancyStatusSerializer,
AgencySerializer,
StopSerializer,
GeoStopSerializer,
Expand Down Expand Up @@ -206,12 +209,14 @@ def post(self, request):
{"status": "error", "step": "operational_validation", "errors": errors},
status=status.HTTP_400_BAD_REQUEST,
)
# Registration of the run (event: RUN_REQUESTED, state: REQUESTED)
# Record creation puts the run in REQUESTED state (run_requested event)
try:
run = Run.objects.create(**payload)
run.vehicle.set([vehicle])
run.operator.set([operator_obj])
payload["run_id"] = run.id
payload["vehicle_id"] = vehicle_id
payload["operator_id"] = operator_id
except Exception as e:
return Response(
{
Expand All @@ -221,26 +226,23 @@ def post(self, request):
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# First transition: GTFS validation (run_lifecycle_state = REQUESTED)
# REQUESTED → VALIDATED: GTFS consistency check
try:
service.process_event(RunLifecycleEvents.RUN_REQUESTED, payload)
service.process_event(RunLifecycleEvents.VALIDATE_RUN, payload)
except RunLifecycleError as e:
payload["guards"] = e.errors.attempts.guards
payload["actions"] = e.errors.attempts.actions
service.process_event(RunLifecycleEvents.RUN_REJECTED, payload)
return Response(
{"status": "error", "step": "gtfs_validation", "errors": e.errors},
status=status.HTTP_422_UNPROCESSABLE_ENTITY,
)
# System initialization (run_lifecycle_state = VALIDATED)
# VALIDATED → INITIALIZED: write system state
try:
service.process_event(RunLifecycleEvents.VALIDATE_RUN, payload)
service.process_event(RunLifecycleEvents.INITIALIZE_RUN, payload)
except RunLifecycleError as e:
return Response(
{"status": "error", "step": "initialization", "errors": e.errors},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# If successful, give a 200 OK response (run_lifecycle_state = INITIALIZED)
return Response(
{
"status": "success",
Expand All @@ -251,32 +253,76 @@ def post(self, request):
)


class UpdateRunViewSet(APIView):
class RunStateViewSet(APIView):
"""
Endpoint to get the current lifecycle state of a run.

It only allows the GET method with the run_id as path parameter.
"""

def get(self, request, run_id):
run = Run.objects.filter(id=run_id).first()
if not run:
return Response(
{"status": "error", "errors": {"run_id": "Run not found"}},
status=status.HTTP_404_NOT_FOUND,
)
return Response(
{"status": "success", "run_lifecycle_state": run.run_lifecycle_state},
status=status.HTTP_200_OK,
)


class RunUpdateViewSet(APIView):
"""
Endpoint to request an update of the lifecycle state of an existing run.

It only allows the POST method with the event to process.
"""

def post(self, request):
def post(self, request, run_id):
service = RunLifecycleService()
serializer = UpdateRunSerializer(data=request.data)
serializer = RunUpdateSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"status": "error", "errors": serializer.errors},
status=status.HTTP_400_BAD_REQUEST,
)
payload = dict(serializer.validated_data)
run_id = payload.get("run_id")
# Flatten `details` into payload so guards/actions can read fields like
# `actor_role` at the top level — matches the convention used by the
# internal Celery path (realtime_engine/tasks.py).
details = payload.pop("details", {}) or {}
if isinstance(details, dict):
payload.update(details)
payload["run_id"] = run_id
run = Run.objects.filter(id=run_id).first()
if not run:
return Response(
{"status": "error", "errors": {"run_id": "Run not found"}},
status=status.HTTP_404_NOT_FOUND,
)
# Ensure the effective event (after payload normalization) is valid.
event = payload.get("event")
event_value = (
event.value if isinstance(event, RunLifecycleEvents) else str(event)
)
allowed_events = {e.value for e in RunLifecycleEvents}
if event_value not in allowed_events:
return Response(
{
"status": "error",
"errors": {
"event": f"Invalid event '{event_value}'. Allowed values: {sorted(allowed_events)}"
},
},
status=status.HTTP_400_BAD_REQUEST,
)
payload["event"] = event_value
try:
new_run_lifecycle_state = service.process_event(event, payload)
new_run_lifecycle_state, _guards, _actions = service.process_event(
event_value, payload
)
except RunLifecycleError as e:
return Response(
{"status": "error", "errors": e.errors},
Expand All @@ -288,21 +334,64 @@ def post(self, request):
)


class RunHistoryView(APIView):
"""
Return the ordered list of FSM transitions for a run.

Read-only audit log derived from RunLifecycleTransition, which the
lifecycle service writes before any external side-effect (so the log
is authoritative even if a downstream action later fails).
"""

def get(self, request, run_id):
if not Run.objects.filter(id=run_id).exists():
return Response(
{"status": "error", "errors": {"detail": f"run {run_id} not found"}},
status=status.HTTP_404_NOT_FOUND,
)
transitions = RunLifecycleTransition.objects.filter(run_id=run_id).order_by(
"timestamp", "created_at"
)
return Response(
{
"run_id": str(run_id),
"transitions": [
{
"event": t.event_name,
"from_state": t.from_state,
"to_state": t.to_state,
"timestamp": t.timestamp.isoformat(),
"actions": t.actions or {},
"guards": t.guards or {},
}
for t in transitions
],
},
status=status.HTTP_200_OK,
)


class PositionViewSet(viewsets.ModelViewSet):
queryset = Position.objects.all()
serializer_class = PositionSerializer
authentication_classes = [TokenAuthentication]


class ProgressionViewSet(viewsets.ModelViewSet):
queryset = Progression.objects.all()
serializer_class = ProgressionSerializer
class VehicleStopStatusViewSet(viewsets.ModelViewSet):
queryset = VehicleStopStatus.objects.all()
serializer_class = VehicleStopStatusSerializer
authentication_classes = [TokenAuthentication]


class CongestionLevelViewSet(viewsets.ModelViewSet):
queryset = CongestionLevel.objects.all()
serializer_class = CongestionLevelSerializer
authentication_classes = [TokenAuthentication]


class OccupancyViewSet(viewsets.ModelViewSet):
queryset = Occupancy.objects.all()
serializer_class = OccupancySerializer
queryset = OccupancyStatus.objects.all()
serializer_class = OccupancyStatusSerializer
authentication_classes = [TokenAuthentication]


Expand Down
Loading