Skip to content

Commit ba7ca24

Browse files
authored
Create a read only way of seeing if a specific route is blocked (#185)
2 parents 50b28bf + 011a99f commit ba7ca24

5 files changed

Lines changed: 156 additions & 10 deletions

File tree

config/api_router.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@
22

33
from rest_framework.routers import DefaultRouter
44

5-
from scram.route_manager.api.views import ActionTypeViewSet, ClientViewSet, EntryViewSet, IgnoreEntryViewSet
5+
from scram.route_manager.api.views import (
6+
ActionTypeViewSet,
7+
ClientViewSet,
8+
EntryViewSet,
9+
IgnoreEntryViewSet,
10+
IsActiveViewSet,
11+
)
612
from scram.users.api.views import UserViewSet
713

814
router = DefaultRouter()
@@ -12,7 +18,7 @@
1218
router.register("register_client", ClientViewSet)
1319
router.register("entries", EntryViewSet)
1420
router.register("ignore_entries", IgnoreEntryViewSet)
15-
21+
router.register("is_active", IsActiveViewSet, "is_active")
1622

1723
app_name = "api"
1824
urlpatterns = router.urls

scram/route_manager/api/serializers.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@ class Meta:
5252
fields = ["hostname", "uuid"]
5353

5454

55+
class IsActiveSerializer(serializers.ModelSerializer):
56+
"""Map the serializer to the Entry model."""
57+
58+
route = serializers.StringRelatedField(source="route.route")
59+
60+
class Meta:
61+
"""Maps to the Entry model, but limits to the the appropriate fields."""
62+
63+
model = Entry
64+
fields = ["is_active", "route"]
65+
66+
5567
class EntrySerializer(serializers.HyperlinkedModelSerializer):
5668
"""Due to the use of ForeignKeys, this follows some relationships to make sense via the API."""
5769

scram/route_manager/api/views.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,20 @@
1010
from django.db.models import Q
1111
from drf_spectacular.utils import extend_schema
1212
from rest_framework import status, viewsets
13+
from rest_framework.exceptions import ValidationError
1314
from rest_framework.permissions import AllowAny, IsAuthenticated
1415
from rest_framework.response import Response
1516
from simple_history.utils import update_change_reason
1617

1718
from ..models import ActionType, Client, Entry, IgnoreEntry, Route, WebSocketSequenceElement
1819
from .exceptions import ActiontypeNotAllowed, IgnoredRoute, NoActiveEntryFound, PrefixTooLarge
19-
from .serializers import ActionTypeSerializer, ClientSerializer, EntrySerializer, IgnoreEntrySerializer
20+
from .serializers import (
21+
ActionTypeSerializer,
22+
ClientSerializer,
23+
EntrySerializer,
24+
IgnoreEntrySerializer,
25+
IsActiveSerializer,
26+
)
2027

2128
channel_layer = get_channel_layer()
2229
logger = logging.getLogger(__name__)
@@ -63,6 +70,51 @@ class ClientViewSet(viewsets.ModelViewSet):
6370
http_method_names = ["post"]
6471

6572

73+
class IsActiveViewSet(viewsets.ReadOnlyModelViewSet):
74+
"""Look up a route to see if SCRAM considers it active or deactivated."""
75+
76+
serializer_class = IsActiveSerializer
77+
permission_classes = (AllowAny,)
78+
http_method_names = ["get"]
79+
80+
normalization_warning: str | None
81+
normalized_cidr_for_response: ipaddress.IPv4Network | ipaddress.IPv6Network | None
82+
83+
def get_queryset(self):
84+
"""Focus queryset on active routes."""
85+
cidr = self.request.query_params.get("cidr")
86+
if not cidr:
87+
raise ValidationError(detail={"error": "cidr parameter is required"})
88+
try:
89+
normalized_cidr = ipaddress.ip_network(cidr, strict=False)
90+
except ValueError:
91+
raise ValidationError(detail={"error": "invalid ip address or network"}) from None
92+
93+
self.normalization_warning = None
94+
self.normalized_cidr_for_response = normalized_cidr
95+
96+
if str(cidr) != str(normalized_cidr):
97+
# save the warning so we can use it in the list response
98+
self.normalization_warning = (
99+
f"Input CIDR '{cidr}' was not canonical and was normalized to '{normalized_cidr!s}' for the search."
100+
)
101+
102+
return Entry.objects.filter(route__route__net_contained_or_equal=normalized_cidr, is_active=True)
103+
104+
def list(self, request):
105+
"""Override the list function to just return a boolean instead of other metadata."""
106+
queryset = self.get_queryset()
107+
108+
if not queryset.exists() and hasattr(self, "normalized_cidr_for_response"):
109+
response_data = {"results": [{"is_active": False, "route": str(self.normalized_cidr_for_response)}]}
110+
else:
111+
serializer = self.get_serializer(queryset, many=True)
112+
response_data = {"results": serializer.data}
113+
response_data["warning"] = self.normalization_warning
114+
115+
return Response(response_data)
116+
117+
66118
@extend_schema(
67119
description="API endpoint for entries",
68120
responses={200: EntrySerializer},

scram/route_manager/tests/acceptance/steps/common.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,7 @@
99
from django import conf
1010
from django.urls import reverse
1111

12-
from scram.route_manager.models import (
13-
ActionType,
14-
Client,
15-
WebSocketMessage,
16-
WebSocketSequenceElement,
17-
)
12+
from scram.route_manager.models import ActionType, Client, WebSocketMessage, WebSocketSequenceElement
1813

1914

2015
@given("a {name} actiontype is defined")

scram/route_manager/tests/test_api.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from rest_framework import status
66
from rest_framework.test import APITestCase
77

8-
from scram.route_manager.models import Client
8+
from scram.route_manager.models import ActionType, Client, Entry, Route
99

1010

1111
class TestAddRemoveIP(APITestCase):
@@ -125,3 +125,84 @@ def test_unauthenticated_users_have_no_list_access(self):
125125
"""Ensure an unauthenticated client can't list Entries."""
126126
response = self.client.get(self.entry_url, format="json")
127127
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
128+
129+
130+
class TestIsActive(APITestCase):
131+
"""Test the is_active endpoint."""
132+
133+
def setUp(self):
134+
"""Set up test data."""
135+
self.url = reverse("api:v1:is_active-list")
136+
self.authorized_client = Client.objects.create(
137+
hostname="authorized_client.es.net",
138+
uuid="0e7e1cbd-7d73-4968-bc4b-ce3265dc2fd3",
139+
is_authorized=True,
140+
)
141+
self.authorized_client.authorized_actiontypes.set([1])
142+
self.actiontype, _ = ActionType.objects.get_or_create(pk=1, defaults={"name": "block"})
143+
144+
# Create some active entries
145+
146+
# Active IPv4
147+
route_v4 = Route.objects.create(route="192.0.2.100")
148+
Entry.objects.create(
149+
route=route_v4, is_active=True, comment="test active", who="test", actiontype=self.actiontype
150+
)
151+
152+
# Active IPv6
153+
route_v6 = Route.objects.create(route="2001:db8::1")
154+
Entry.objects.create(
155+
route=route_v6, is_active=True, comment="test active v6", who="test", actiontype=self.actiontype
156+
)
157+
158+
# Deactivated IPv4 entry
159+
route_inactive = Route.objects.create(route="192.0.2.200")
160+
Entry.objects.create(
161+
route=route_inactive, is_active=False, comment="inactive", who="test", actiontype=self.actiontype
162+
)
163+
164+
# Deactived IPv6 entry
165+
route_inactive = Route.objects.create(route="2001:db8::5")
166+
Entry.objects.create(
167+
route=route_inactive, is_active=False, comment="inactive", who="test", actiontype=self.actiontype
168+
)
169+
170+
def test_active_ipv4_returns_true(self):
171+
"""Check that an active IPv4 returns is_active=true."""
172+
response = self.client.get(self.url, {"cidr": "192.0.2.100"})
173+
self.assertEqual(response.status_code, status.HTTP_200_OK)
174+
self.assertEqual(len(response.data["results"]), 1)
175+
self.assertEqual(response.data["results"][0]["is_active"], True)
176+
self.assertEqual(response.data["results"][0]["route"], "192.0.2.100/32")
177+
178+
def test_active_ipv6_returns_true(self):
179+
"""Check that an active IPv6 returns is_active=true."""
180+
response = self.client.get(self.url, {"cidr": "2001:db8::1"})
181+
self.assertEqual(response.status_code, status.HTTP_200_OK)
182+
self.assertEqual(len(response.data["results"]), 1)
183+
self.assertEqual(response.data["results"][0]["is_active"], True)
184+
self.assertEqual(response.data["results"][0]["route"], "2001:db8::1/128")
185+
186+
def test_inactive_entry_ipv4_returns_false(self):
187+
"""Check that an inactive entry returns is_active=false."""
188+
response = self.client.get(self.url, {"cidr": "192.0.2.200"})
189+
self.assertEqual(response.status_code, status.HTTP_200_OK)
190+
self.assertEqual(len(response.data["results"]), 1)
191+
self.assertEqual(response.data["results"][0]["is_active"], False)
192+
self.assertEqual(response.data["results"][0]["route"], "192.0.2.200/32")
193+
194+
def test_inactive_entry_ipv6_returns_false(self):
195+
"""Check that an inactive entry returns is_active=false."""
196+
response = self.client.get(self.url, {"cidr": "2001:db8::5"})
197+
self.assertEqual(len(response.data["results"]), 1)
198+
self.assertEqual(response.data["results"][0]["is_active"], False)
199+
self.assertEqual(response.data["results"][0]["route"], "2001:db8::5/128")
200+
201+
def test_unauthenticated_access_allowed(self):
202+
"""Ensure unauthenticated clients can check if IPs are active."""
203+
# Logout any authenticated user
204+
self.client.logout()
205+
response = self.client.get(self.url, {"cidr": "192.0.2.100"})
206+
self.assertEqual(response.status_code, status.HTTP_200_OK)
207+
self.assertEqual(len(response.data["results"]), 1)
208+
self.assertEqual(response.data["results"][0]["is_active"], True)

0 commit comments

Comments
 (0)