Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dojo/jira/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
to_str_typed,
truncate_with_dots,
)
from dojo.utils_ssrf import SSRFError, validate_url_for_ssrf

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -435,6 +436,12 @@ def has_jira_configured(obj):


def connect_to_jira(jira_server, jira_username, jira_password):
try:
validate_url_for_ssrf(jira_server)
except SSRFError as e:
msg = f"JIRA URL is not allowed: {e}"
raise ValueError(msg) from e

max_retries = getattr(settings, "JIRA_MAX_RETRIES", 3)
timeout = getattr(settings, "JIRA_TIMEOUT", (10, 30))

Expand Down
7 changes: 5 additions & 2 deletions unittests/test_jira_config_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ def test_add_jira_instance_unknown_host(self):
# debian throws 'Name or service not known' error and alpine 'Name does not resolve'
self.assertTrue(("Name or service not known" in content) or ("Name does not resolve" in content), content)

# test raw connection error
with self.assertRaises(requests.exceptions.RequestException):
# test raw connection error — the helper now rejects unresolvable
# hostnames via validate_url_for_ssrf before invoking the JIRA client,
# which surfaces as ValueError. The original requests-level failure is
# still accepted in case validation is bypassed by configuration.
with self.assertRaises((ValueError, requests.exceptions.RequestException)):
jira_helper.get_jira_connection_raw(data["url"], data["username"], data["password"])

@patch("dojo.jira.views.jira_helper.get_jira_connection_raw")
Expand Down
72 changes: 72 additions & 0 deletions unittests/test_jira_helper_ssrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from unittest.mock import patch

from dojo.jira.forms import BaseJiraForm
from dojo.jira.helper import connect_to_jira
from unittests.dojo_test_case import DojoTestCase


class TestConnectToJiraUrlValidation(DojoTestCase):

def test_rfc1918_url_raises_value_error_before_calling_jira(self):
with patch("dojo.jira.helper.JIRA") as mock_jira:
with self.assertRaisesRegex(ValueError, "JIRA URL is not allowed"):
connect_to_jira("http://172.18.0.3:5432", "user", "password")
mock_jira.assert_not_called()

def test_loopback_url_raises_value_error_before_calling_jira(self):
with patch("dojo.jira.helper.JIRA") as mock_jira:
with self.assertRaisesRegex(ValueError, "JIRA URL is not allowed"):
connect_to_jira("http://127.0.0.1/", "user", "password")
mock_jira.assert_not_called()

def test_link_local_metadata_url_raises_value_error_before_calling_jira(self):
with patch("dojo.jira.helper.JIRA") as mock_jira:
with self.assertRaisesRegex(ValueError, "JIRA URL is not allowed"):
connect_to_jira("http://169.254.169.254/", "user", "password")
mock_jira.assert_not_called()

def test_unsupported_scheme_raises_value_error_before_calling_jira(self):
with patch("dojo.jira.helper.JIRA") as mock_jira:
with self.assertRaisesRegex(ValueError, "JIRA URL is not allowed"):
connect_to_jira("file:///etc/passwd", "user", "password")
mock_jira.assert_not_called()

def test_public_url_proceeds_to_jira_client(self):
# 8.8.8.8 is globally routable; getaddrinfo on a numeric IP literal
# does not hit DNS, so this is reliable in CI.
with patch("dojo.jira.helper.JIRA") as mock_jira:
connect_to_jira("http://8.8.8.8/", "user", "password")
mock_jira.assert_called_once()
_, kwargs = mock_jira.call_args
self.assertEqual(kwargs["server"], "http://8.8.8.8/")
self.assertEqual(kwargs["basic_auth"], ("user", "password"))


class TestBaseJiraFormSurfacesValidationError(DojoTestCase):

def test_form_clean_surfaces_blocked_url_as_form_error_not_500(self):
form = BaseJiraForm.__new__(BaseJiraForm)
form._errors = {}
form.cleaned_data = {
"url": "http://127.0.0.1/",
"username": "user",
"password": "password",
}

added_errors: dict[str, list[str]] = {}

def fake_add_error(field, message):
added_errors.setdefault(field, []).append(str(message))

form.add_error = fake_add_error # type: ignore[assignment]

# Should not raise — exception is caught and surfaced as form errors.
form.test_jira_connection()

self.assertIn("username", added_errors)
self.assertIn("password", added_errors)
for messages in added_errors.values():
self.assertTrue(
any("JIRA URL is not allowed" in m for m in messages),
f"Expected blocked-URL detail in form error, got: {messages}",
)
Loading