Skip to content

Commit 74451bd

Browse files
omer9564claude
andauthored
Add control plane connectivity control for offline mode (#305)
* Add control plane connectivity control for offline mode Adds PDP-level bindings for OPAL's new server connectivity feature, allowing the PDP to start disconnected from the control plane and serve from a local backup. Includes runtime HTTP endpoints (/control-plane/connectivity) authenticated with the PDP API key. - Add PDP_CONTROL_PLANE_CONNECTIVITY_DISABLED config option - Create /control-plane/connectivity GET/enable/disable endpoints - Upgrade opal-common and opal-client to 0.9.4rc6 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Address PR review: add concurrency safety, error handling, Literal types, OpenAPI responses, and tests - Add asyncio.Lock to serialize enable/disable operations (fixes TOCTOU race) - Add try/except with proper error logging and HTTP 500 responses - Use Literal type for ConnectivityActionResult.status field - Add responses parameter to route decorators for OpenAPI docs - Add test_connectivity_api.py with 10 tests covering all endpoints Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Address PR review: guard connectivity router behind offline mode and update OPAL to stable - Only mount connectivity router when ENABLE_OFFLINE_MODE is enabled to reduce attack surface - Update opal-common and opal-client from 0.9.4rc6 to stable 0.9.4 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8dd5a7a commit 74451bd

6 files changed

Lines changed: 262 additions & 2 deletions

File tree

horizon/config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ def __new__(cls, *, prefix=None, is_model=True): # noqa: ARG004
131131
description="Filename for offline mode's policy backup (OPAL's offline mode backup)",
132132
)
133133

134+
CONTROL_PLANE_CONNECTIVITY_DISABLED = confi.bool(
135+
"CONTROL_PLANE_CONNECTIVITY_DISABLED",
136+
False,
137+
description="When true (and ENABLE_OFFLINE_MODE is true), the PDP starts disconnected from the control plane "
138+
"and serves from a local backup. Can be toggled at runtime via the /control-plane/connectivity endpoints.",
139+
)
140+
134141
CONFIG_FETCH_MAX_RETRIES = confi.int(
135142
"CONFIG_FETCH_MAX_RETRIES",
136143
6,

horizon/connectivity/__init__.py

Whitespace-only changes.

horizon/connectivity/api.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import logging
5+
from typing import TYPE_CHECKING, Literal
6+
7+
from fastapi import APIRouter, Depends, HTTPException, status
8+
from pydantic import BaseModel
9+
10+
from horizon.authentication import enforce_pdp_token
11+
12+
if TYPE_CHECKING:
13+
from opal_client.client import OpalClient
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class ConnectivityStatus(BaseModel):
19+
control_plane_connectivity_disabled: bool
20+
offline_mode_enabled: bool
21+
22+
23+
class ConnectivityActionResult(BaseModel):
24+
status: Literal[
25+
"enabled",
26+
"disabled",
27+
"already_enabled",
28+
"already_disabled",
29+
]
30+
31+
32+
def init_connectivity_router(opal_client: OpalClient):
33+
router = APIRouter(
34+
prefix="/control-plane",
35+
dependencies=[Depends(enforce_pdp_token)],
36+
)
37+
_lock = asyncio.Lock()
38+
39+
@router.get(
40+
"/connectivity",
41+
status_code=status.HTTP_200_OK,
42+
response_model=ConnectivityStatus,
43+
summary="Get control plane connectivity status",
44+
description="Returns the current connectivity state to the control plane and whether offline mode is enabled.",
45+
)
46+
async def get_connectivity_status():
47+
return ConnectivityStatus(
48+
control_plane_connectivity_disabled=opal_client.opal_server_connectivity_disabled,
49+
offline_mode_enabled=opal_client.offline_mode_enabled,
50+
)
51+
52+
@router.post(
53+
"/connectivity/enable",
54+
status_code=status.HTTP_200_OK,
55+
response_model=ConnectivityActionResult,
56+
responses={
57+
400: {"description": "Offline mode is not enabled"},
58+
500: {"description": "Failed to enable control plane connectivity"},
59+
},
60+
summary="Enable control plane connectivity",
61+
description="Starts the policy and data updaters, reconnecting to the control plane. "
62+
"Triggers a full rehydration (policy refetch + data refetch). "
63+
"Requires offline mode to be enabled.",
64+
)
65+
async def enable_connectivity():
66+
if not opal_client.offline_mode_enabled:
67+
raise HTTPException(
68+
status_code=status.HTTP_400_BAD_REQUEST,
69+
detail="Cannot enable control plane connectivity: offline mode is not enabled",
70+
)
71+
async with _lock:
72+
if not opal_client.opal_server_connectivity_disabled:
73+
return ConnectivityActionResult(status="already_enabled")
74+
try:
75+
await opal_client.enable_opal_server_connectivity()
76+
except Exception:
77+
logger.exception("Failed to enable control plane connectivity")
78+
raise HTTPException(
79+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
80+
detail="Failed to enable control plane connectivity",
81+
) from None
82+
return ConnectivityActionResult(status="enabled")
83+
84+
@router.post(
85+
"/connectivity/disable",
86+
status_code=status.HTTP_200_OK,
87+
response_model=ConnectivityActionResult,
88+
responses={
89+
400: {"description": "Offline mode is not enabled"},
90+
500: {"description": "Failed to disable control plane connectivity"},
91+
},
92+
summary="Disable control plane connectivity",
93+
description="Stops the policy and data updaters, disconnecting from the control plane. "
94+
"Requires offline mode to be enabled. The policy store continues serving from its current state.",
95+
)
96+
async def disable_connectivity():
97+
if not opal_client.offline_mode_enabled:
98+
raise HTTPException(
99+
status_code=status.HTTP_400_BAD_REQUEST,
100+
detail="Cannot disable control plane connectivity: offline mode is not enabled",
101+
)
102+
async with _lock:
103+
if opal_client.opal_server_connectivity_disabled:
104+
return ConnectivityActionResult(status="already_disabled")
105+
try:
106+
await opal_client.disable_opal_server_connectivity()
107+
except Exception:
108+
logger.exception("Failed to disable control plane connectivity")
109+
raise HTTPException(
110+
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
111+
detail="Failed to disable control plane connectivity",
112+
) from None
113+
return ConnectivityActionResult(status="disabled")
114+
115+
return router

horizon/pdp.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from horizon.authentication import enforce_pdp_token
2929
from horizon.config import MOCK_API_KEY, sidecar_config
30+
from horizon.connectivity.api import init_connectivity_router
3031
from horizon.enforcer.api import init_enforcer_api_router, stats_manager
3132
from horizon.enforcer.opa.config_maker import (
3233
get_opa_authz_policy_file_path,
@@ -148,6 +149,7 @@ def __init__(self):
148149

149150
self._configure_opal_data_updater()
150151
self._configure_opal_offline_mode()
152+
self._configure_opal_server_connectivity()
151153

152154
if sidecar_config.PRINT_CONFIG_ON_STARTUP:
153155
logger.info(
@@ -326,6 +328,16 @@ def _configure_opal_offline_mode(self):
326328
Path(sidecar_config.OFFLINE_MODE_BACKUP_DIR) / sidecar_config.OFFLINE_MODE_POLICY_BACKUP_FILENAME
327329
)
328330

331+
def _configure_opal_server_connectivity(self):
332+
"""
333+
configure control plane connectivity when offline mode is enabled.
334+
When both offline mode and connectivity disabled are set, the PDP starts
335+
disconnected from the control plane and serves from a local backup.
336+
"""
337+
opal_client_config.DEFAULT_OPAL_SERVER_CONNECTIVITY_DISABLED = (
338+
sidecar_config.ENABLE_OFFLINE_MODE and sidecar_config.CONTROL_PLANE_CONNECTIVITY_DISABLED
339+
)
340+
329341
def _fix_data_topics(self) -> list[str]:
330342
"""
331343
This is a worksaround for the following issue:
@@ -411,6 +423,13 @@ def _configure_api_routes(self, app: FastAPI):
411423
include_in_schema=False,
412424
dependencies=[Depends(enforce_pdp_token)],
413425
)
426+
if sidecar_config.ENABLE_OFFLINE_MODE:
427+
connectivity_router = init_connectivity_router(self._opal)
428+
app.include_router(
429+
connectivity_router,
430+
tags=["Control Plane Connectivity"],
431+
dependencies=[Depends(enforce_pdp_token)],
432+
)
414433

415434
# TODO: remove this when clients update sdk version (legacy routes)
416435
@app.post(
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from unittest.mock import AsyncMock, PropertyMock
2+
3+
from fastapi import FastAPI
4+
from fastapi.testclient import TestClient
5+
from horizon.authentication import enforce_pdp_token
6+
from horizon.connectivity.api import init_connectivity_router
7+
8+
9+
def _noop_auth():
10+
pass
11+
12+
13+
def _create_test_app(opal_client_mock):
14+
"""Create a test FastAPI app with the connectivity router (no auth)."""
15+
app = FastAPI()
16+
router = init_connectivity_router(opal_client_mock)
17+
app.include_router(router)
18+
app.dependency_overrides[enforce_pdp_token] = _noop_auth
19+
return app
20+
21+
22+
def _make_opal_mock(*, offline_mode_enabled=True, connectivity_disabled=False):
23+
mock = AsyncMock()
24+
type(mock).offline_mode_enabled = PropertyMock(return_value=offline_mode_enabled)
25+
type(mock).opal_server_connectivity_disabled = PropertyMock(return_value=connectivity_disabled)
26+
return mock
27+
28+
29+
class TestGetConnectivityStatus:
30+
def test_returns_status(self):
31+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
32+
client = TestClient(_create_test_app(mock))
33+
34+
resp = client.get("/control-plane/connectivity")
35+
assert resp.status_code == 200
36+
data = resp.json()
37+
assert data["control_plane_connectivity_disabled"] is True
38+
assert data["offline_mode_enabled"] is True
39+
40+
def test_returns_status_when_connected(self):
41+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
42+
client = TestClient(_create_test_app(mock))
43+
44+
resp = client.get("/control-plane/connectivity")
45+
assert resp.status_code == 200
46+
data = resp.json()
47+
assert data["control_plane_connectivity_disabled"] is False
48+
49+
50+
class TestEnableConnectivity:
51+
def test_enable_success(self):
52+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
53+
client = TestClient(_create_test_app(mock))
54+
55+
resp = client.post("/control-plane/connectivity/enable")
56+
assert resp.status_code == 200
57+
assert resp.json()["status"] == "enabled"
58+
mock.enable_opal_server_connectivity.assert_awaited_once()
59+
60+
def test_enable_already_enabled(self):
61+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
62+
client = TestClient(_create_test_app(mock))
63+
64+
resp = client.post("/control-plane/connectivity/enable")
65+
assert resp.status_code == 200
66+
assert resp.json()["status"] == "already_enabled"
67+
mock.enable_opal_server_connectivity.assert_not_awaited()
68+
69+
def test_enable_returns_400_when_offline_mode_disabled(self):
70+
mock = _make_opal_mock(offline_mode_enabled=False)
71+
client = TestClient(_create_test_app(mock))
72+
73+
resp = client.post("/control-plane/connectivity/enable")
74+
assert resp.status_code == 400
75+
76+
def test_enable_returns_500_on_opal_error(self):
77+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
78+
mock.enable_opal_server_connectivity.side_effect = RuntimeError("boom")
79+
client = TestClient(_create_test_app(mock))
80+
81+
resp = client.post("/control-plane/connectivity/enable")
82+
assert resp.status_code == 500
83+
assert "Failed to enable" in resp.json()["detail"]
84+
85+
86+
class TestDisableConnectivity:
87+
def test_disable_success(self):
88+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
89+
client = TestClient(_create_test_app(mock))
90+
91+
resp = client.post("/control-plane/connectivity/disable")
92+
assert resp.status_code == 200
93+
assert resp.json()["status"] == "disabled"
94+
mock.disable_opal_server_connectivity.assert_awaited_once()
95+
96+
def test_disable_already_disabled(self):
97+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=True)
98+
client = TestClient(_create_test_app(mock))
99+
100+
resp = client.post("/control-plane/connectivity/disable")
101+
assert resp.status_code == 200
102+
assert resp.json()["status"] == "already_disabled"
103+
mock.disable_opal_server_connectivity.assert_not_awaited()
104+
105+
def test_disable_returns_400_when_offline_mode_disabled(self):
106+
mock = _make_opal_mock(offline_mode_enabled=False)
107+
client = TestClient(_create_test_app(mock))
108+
109+
resp = client.post("/control-plane/connectivity/disable")
110+
assert resp.status_code == 400
111+
112+
def test_disable_returns_500_on_opal_error(self):
113+
mock = _make_opal_mock(offline_mode_enabled=True, connectivity_disabled=False)
114+
mock.disable_opal_server_connectivity.side_effect = RuntimeError("boom")
115+
client = TestClient(_create_test_app(mock))
116+
117+
resp = client.post("/control-plane/connectivity/disable")
118+
assert resp.status_code == 500
119+
assert "Failed to disable" in resp.json()["detail"]

requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ httpx>=0.27.0,<1
1717
# google-re2 # use re2 instead of re for regex matching because it's simiplier and safer for user inputted regexes
1818
protobuf>=6.33.5 # pinned to avoid CVE-2026-0994
1919
cryptography>=46.0.5,<47 # pinned to avoid CVE-2026-26007
20-
opal-common==0.8.3
21-
opal-client==0.8.3
20+
opal-common==0.9.4
21+
opal-client==0.9.4

0 commit comments

Comments
 (0)