Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions infrabox/deploy/build-dashboard-client/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM node:8.9-alpine
FROM node:20-alpine

CMD /infrabox/context/src/dashboard-client/build.sh
CMD /infrabox/context/src/dashboard-client/build.sh
14 changes: 7 additions & 7 deletions infrabox/test/api/global_tokens_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def test_list_tokens_initially_empty(self):
def test_list_tokens_only_returns_own_tokens(self):
# Insert a token owned by another user
TestClient.execute("""
INSERT INTO global_token (id, description, scope_push, scope_pull, user_id)
VALUES (%s, 'other token', false, true, %s)
INSERT INTO global_token (id, description, scope_push, scope_pull, user_id, expires_at)
VALUES (%s, 'other token', false, true, %s, NOW() + INTERVAL '30 days')
""", [str(uuid.uuid4()), self.other_user_id])

r = TestClient.get(self.URL, TestClient.get_user_authorization(self.user_id))
Expand Down Expand Up @@ -119,8 +119,8 @@ def test_delete_own_token_removes_from_db(self):
def test_cannot_delete_other_users_token(self):
other_token_id = str(uuid.uuid4())
TestClient.execute("""
INSERT INTO global_token (id, description, scope_push, scope_pull, user_id)
VALUES (%s, 'not yours', false, true, %s)
INSERT INTO global_token (id, description, scope_push, scope_pull, user_id, expires_at)
VALUES (%s, 'not yours', false, true, %s, NOW() + INTERVAL '30 days')
""", [other_token_id, self.other_user_id])

r = TestClient.delete(self.TOKEN_URL % other_token_id,
Expand Down Expand Up @@ -180,8 +180,8 @@ def test_access_log_returns_entries(self):
def test_access_log_enforces_ownership(self):
other_token_id = str(uuid.uuid4())
TestClient.execute("""
INSERT INTO global_token (id, description, scope_push, scope_pull, user_id)
VALUES (%s, 'other log token', false, true, %s)
INSERT INTO global_token (id, description, scope_push, scope_pull, user_id, expires_at)
VALUES (%s, 'other log token', false, true, %s, NOW() + INTERVAL '30 days')
""", [other_token_id, self.other_user_id])
TestClient.execute("""
INSERT INTO global_token_access_log (token_id, path, method, status_code)
Expand All @@ -196,4 +196,4 @@ def test_access_log_nonexistent_token_returns_404(self):
fake_id = str(uuid.uuid4())
r = TestClient.get(self.ACCESS_LOG_URL % fake_id,
headers=TestClient.get_user_authorization(self.user_id))
self.assertEqual(r['status'], 404)
self.assertEqual(r['status'], 404)
212 changes: 212 additions & 0 deletions infrabox/test/api/mcp_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""
Unit tests for the MCP API layer:
- MCP token auth (valid, invalid, expired, revoked, wrong path)
- Rate limiter (allow, deny, fail-open)
- Project access check (token scoped, session fallback)
- Trigger access check
"""
import hashlib
import secrets
import sys
import unittest
from datetime import datetime, timezone, timedelta
from unittest.mock import MagicMock, patch

# Add src/ to path so we can import the MCP modules directly without
# triggering api/handlers/__init__.py (which requires INFRABOX_* env vars).
import os
_src_dir = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'src')
sys.path.insert(0, _src_dir)

# Stub heavy server-init modules before importing our modules
import types

# pyinfraboxutils stubs
piu = types.ModuleType('pyinfraboxutils')
piu.get_logger = lambda name: __import__('logging').getLogger(name)
piu.get_env = lambda k: os.environ.get(k, '')
sys.modules.setdefault('pyinfraboxutils', piu)
sys.modules.setdefault('pyinfraboxutils.dbpool', types.ModuleType('pyinfraboxutils.dbpool'))
sys.modules.setdefault('pyinfraboxutils.db', types.ModuleType('pyinfraboxutils.db'))

# flask_restx stub
frestx = types.ModuleType('flask_restx')
frestx.Resource = object
frestx.Api = MagicMock()
sys.modules.setdefault('flask_restx', frestx)

ibrestplus = types.ModuleType('pyinfraboxutils.ibrestplus')
ibrestplus.api = MagicMock()
ibrestplus.response_model = {}
sys.modules.setdefault('pyinfraboxutils.ibrestplus', ibrestplus)

# Stub api.handlers as a real package (with __path__) so Python can resolve
# api.handlers.mcp.* from disk without executing api/handlers/__init__.py.
_API_HANDLERS = 'api.handlers'

# Stub api.handlers as a real package (with __path__) so Python can resolve
# api.handlers.mcp.* from disk without executing api/handlers/__init__.py.
_api = types.ModuleType('api')
_api.__path__ = [os.path.join(_src_dir, 'api')]
_api.__package__ = 'api'
_api_handlers = types.ModuleType(_API_HANDLERS)
_api_handlers.__path__ = [os.path.join(_src_dir, 'api', 'handlers')]
_api_handlers.__package__ = _API_HANDLERS
_api.handlers = _api_handlers
sys.modules['api'] = _api
sys.modules[_API_HANDLERS] = _api_handlers

# Now import the modules under test directly
import importlib
mcp_auth = importlib.import_module('api.handlers.mcp.auth')
mcp_rate_limit_mod = importlib.import_module('api.handlers.mcp.rate_limit')


# ---------------------------------------------------------------------------
# Auth module — token hash
# ---------------------------------------------------------------------------

class TestMcpTokenHash(unittest.TestCase):
def test_hash_is_sha256_hex(self):
result = mcp_auth._hash_token('ib_mcp_' + 'a' * 48)
self.assertEqual(len(result), 64)
self.assertTrue(all(c in '0123456789abcdef' for c in result))

def test_different_tokens_produce_different_hashes(self):
h1 = mcp_auth._hash_token('ib_mcp_' + 'a' * 48)
h2 = mcp_auth._hash_token('ib_mcp_' + 'b' * 48)
self.assertNotEqual(h1, h2)

def test_same_token_deterministic(self):
raw = 'ib_mcp_' + secrets.token_hex(24)
self.assertEqual(mcp_auth._hash_token(raw), mcp_auth._hash_token(raw))

def test_matches_expected_sha256(self):
raw = 'ib_mcp_test'
expected = hashlib.sha256(raw.encode('utf-8')).hexdigest()
self.assertEqual(mcp_auth._hash_token(raw), expected)


# ---------------------------------------------------------------------------
# Project access check
# ---------------------------------------------------------------------------

class TestCheckProjectAccessMcp(unittest.TestCase):
def _g_with_projects(self, projects):
g = MagicMock()
g.mcp_enabled_projects = projects
return g

def test_no_mcp_attr_allows_all(self):
g = MagicMock(spec=[]) # no mcp_enabled_projects attribute at all
with patch.object(mcp_auth, 'g', g):
self.assertTrue(mcp_auth.check_project_access_mcp('any-id'))

def test_project_not_in_scope_denied(self):
g = self._g_with_projects({'other-id': None})
with patch.object(mcp_auth, 'g', g):
self.assertFalse(mcp_auth.check_project_access_mcp('target-id'))

def test_project_in_scope_no_expiry_allowed(self):
g = self._g_with_projects({'target-id': None})
with patch.object(mcp_auth, 'g', g):
self.assertTrue(mcp_auth.check_project_access_mcp('target-id'))

def test_per_project_expiry_in_future_allowed(self):
future = (datetime.now(timezone.utc) + timedelta(days=1)).isoformat()
g = self._g_with_projects({'pid': future})
with patch.object(mcp_auth, 'g', g):
self.assertTrue(mcp_auth.check_project_access_mcp('pid'))

def test_per_project_expiry_in_past_denied(self):
past = (datetime.now(timezone.utc) - timedelta(days=1)).isoformat()
g = self._g_with_projects({'pid': past})
with patch.object(mcp_auth, 'g', g):
self.assertFalse(mcp_auth.check_project_access_mcp('pid'))


# ---------------------------------------------------------------------------
# Trigger access check
# ---------------------------------------------------------------------------

class TestCheckTriggerAccessMcp(unittest.TestCase):
def test_no_mcp_attr_allows(self):
g = MagicMock(spec=[])
with patch.object(mcp_auth, 'g', g):
self.assertTrue(mcp_auth.check_trigger_access_mcp())

def test_allow_trigger_true(self):
g = MagicMock()
g.mcp_allow_trigger = True
with patch.object(mcp_auth, 'g', g):
self.assertTrue(mcp_auth.check_trigger_access_mcp())

def test_allow_trigger_false(self):
g = MagicMock()
g.mcp_allow_trigger = False
with patch.object(mcp_auth, 'g', g):
self.assertFalse(mcp_auth.check_trigger_access_mcp())


# ---------------------------------------------------------------------------
# Rate limiter
# ---------------------------------------------------------------------------

class TestMcpRateLimit(unittest.TestCase):
def _run_check(self, count_result):
mock_redis = MagicMock()
pipeline = MagicMock()
pipeline.execute.return_value = [None, None, count_result, None]
mock_redis.pipeline.return_value = pipeline

with patch.object(mcp_rate_limit_mod, '_get_redis', return_value=mock_redis), \
patch('time.time', return_value=1_000_000.0):
return mcp_rate_limit_mod._check_rate_limit('user-123', 'list_builds')

def test_under_limit_allowed(self):
self.assertTrue(self._run_check(1))

def test_at_limit_allowed(self):
self.assertTrue(self._run_check(mcp_rate_limit_mod._DEFAULT_RPM))

def test_over_limit_denied(self):
self.assertFalse(self._run_check(mcp_rate_limit_mod._DEFAULT_RPM + 1))

def test_fail_open_when_no_redis(self):
with patch.object(mcp_rate_limit_mod, '_get_redis', return_value=None):
self.assertTrue(mcp_rate_limit_mod._check_rate_limit('user', 'list_builds'))

def test_fail_open_on_redis_exception(self):
mock_redis = MagicMock()
mock_redis.pipeline.side_effect = RuntimeError('connection lost')
with patch.object(mcp_rate_limit_mod, '_get_redis', return_value=mock_redis):
self.assertTrue(mcp_rate_limit_mod._check_rate_limit('user', 'list_builds'))

def test_trigger_rpm_lower_than_default(self):
self.assertLess(mcp_rate_limit_mod._ENDPOINT_LIMITS['trigger_build'],
mcp_rate_limit_mod._DEFAULT_RPM)

def test_log_rpm_lower_than_default(self):
self.assertLess(mcp_rate_limit_mod._ENDPOINT_LIMITS['get_job_log'],
mcp_rate_limit_mod._DEFAULT_RPM)

def test_artifact_rpm_lower_than_default(self):
self.assertLess(mcp_rate_limit_mod._ENDPOINT_LIMITS['list_job_artifacts'],
mcp_rate_limit_mod._DEFAULT_RPM)

def test_different_users_independent(self):
# Two users: first at limit, second should still be allowed.
calls = []
def fake_check(user_id, endpoint):
calls.append(user_id)
if user_id == 'heavy-user':
return False
return True
with patch.object(mcp_rate_limit_mod, '_check_rate_limit', side_effect=fake_check):
self.assertFalse(mcp_rate_limit_mod._check_rate_limit('heavy-user', 'list_builds'))
self.assertTrue(mcp_rate_limit_mod._check_rate_limit('normal-user', 'list_builds'))


if __name__ == '__main__':
unittest.main()

3 changes: 2 additions & 1 deletion infrabox/test/api/test_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class ApiTestTemplate(unittest.TestCase):

def setUp(self):
TestClient.execute(
'TRUNCATE global_token_access_log, global_token, '
'TRUNCATE mcp_access_log, mcp_token, '
'global_token_access_log, global_token, '
'collaborator, auth_token, secret, '
'console, job_markup, job_badge, job, '
'build, commit, repository, '
Expand Down
1 change: 1 addition & 0 deletions src/api/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
import api.handlers.build
import api.handlers.job_api
import api.handlers.admin
import api.handlers.mcp
4 changes: 4 additions & 0 deletions src/api/handlers/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import api.handlers.mcp.token_routes
import api.handlers.mcp.routes.projects
import api.handlers.mcp.routes.builds
import api.handlers.mcp.routes.jobs
57 changes: 57 additions & 0 deletions src/api/handlers/mcp/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Audit logging for MCP API calls.

Writes to the mcp_access_log table (best-effort, fire-and-forget).
Never raises — a logging failure must not break the request.
"""
import logging
import threading

from flask import g, request

logger = logging.getLogger('mcp_audit')


def audit_mcp(action: str, outcome: str = 'attempt', details: dict = None, error: str = ''):
"""Record one MCP audit entry. Non-blocking: runs in a daemon thread."""
token_id = getattr(g, 'mcp_token_id', None)
user_id = getattr(g, 'mcp_token_user_id', None)
if not user_id:
token = getattr(g, 'token', None)
if token and 'user' in token:
user_id = str(token['user'].get('id', ''))
ip = request.remote_addr

# Capture a snapshot of the db connection so the thread can use it safely.
# For simplicity we write synchronously on the request db connection since
# the volume is low. If latency becomes a concern this can be offloaded.
try:
db = getattr(g, 'db', None)
if db is None:
return

db.execute('''
INSERT INTO mcp_access_log (token_id, user_id, action, outcome, details, error, ip)
VALUES (%s, %s, %s, %s, %s, %s, %s)
''', [
token_id,
user_id,
action,
outcome,
_to_json(details),
error or None,
ip,
])
db.commit()
except Exception as exc:
logger.warning('MCP audit log failed: %s', exc)


def _to_json(d):
if d is None:
return None
import json
try:
return json.dumps(d)
except Exception:
return str(d)
Loading
Loading