From 894a4b1ea48bf35f7c7aa0c0dbf3ec9dc1f4000b Mon Sep 17 00:00:00 2001 From: Szabolcs Antal Date: Wed, 10 Jun 2026 19:31:55 +0200 Subject: [PATCH 1/3] #34 - Experiment with analyzer integration tests --- tests/test_integration_analyzers.py | 190 ++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 tests/test_integration_analyzers.py diff --git a/tests/test_integration_analyzers.py b/tests/test_integration_analyzers.py new file mode 100644 index 0000000..b3438b8 --- /dev/null +++ b/tests/test_integration_analyzers.py @@ -0,0 +1,190 @@ +import json +import tempfile +from contextlib import contextmanager +from pathlib import Path + +import pytest + +from cortexutils.analyzer import Analyzer + +DEFAULT_INPUT = { + "dataType": "ip", + "data": "1.2.3.4", +} + +DEFAULT_OUTPUT = { + "success": True, + "full": {}, + "summary": {}, + "artifacts": [], + "operations": [], +} + + +@contextmanager +def init_job_directory(input_obj: dict | None = None): + """Context manager that yields a temporary job directory path, then cleans up.""" + temp_dir = tempfile.TemporaryDirectory() + job_dir = Path(temp_dir.name) + + try: + input_dir = job_dir / "input" + input_dir.mkdir() + + if input_obj is None: + input_obj = DEFAULT_INPUT + + with open(input_dir / "input.json", "w") as f: + json.dump(input_obj, f) + + output_dir = job_dir / "output" + output_dir.mkdir() + + yield job_dir + + finally: + temp_dir.cleanup() + + +def load_output(job_directory: Path) -> dict: + with open(job_directory / "output" / "output.json") as output_file: + output = json.load(output_file) + return output + + +def test_simple_analyzer_success(): + with init_job_directory() as job_directory: + Analyzer(job_directory).report({}) + output = load_output(job_directory) + assert output == DEFAULT_OUTPUT + + +def test_simple_analyzer_error(): + + error_msg = "test analyzer error" + with init_job_directory() as job_directory: + with pytest.raises(SystemExit): + Analyzer(job_directory).error(error_msg) + output = load_output(job_directory) + + assert output == { + "success": False, + "input": DEFAULT_INPUT, + "errorMessage": error_msg, + } + + +def test_analyzer_report_with_summary_and_taxonomies(): + class TestAnalyzer(Analyzer): + def summary(self, raw): + taxonomies = [] + for taxonomy_level in ["info", "safe", "suspicious", "malicious", "n/a"]: + taxonomies.append( + self.build_taxonomy( + level=taxonomy_level, + namespace="cortexutils", + predicate="integration-tests", + value="analyzer", + ) + ) + return {"taxonomies": taxonomies, "raw": raw} + + with init_job_directory() as job_directory: + full_report = {} + TestAnalyzer(job_directory).report(full_report) + output = load_output(job_directory) + + assert output == { + **DEFAULT_OUTPUT, + "summary": { + "raw": full_report, + "taxonomies": [ + { + "level": level, + "namespace": "cortexutils", + "predicate": "integration-tests", + "value": "analyzer", + } + for level in ["info", "safe", "suspicious", "malicious", "info"] + ], + }, + } + + +def test_analyzer_report_with_operations(): + class TestAnalyzer(Analyzer): + def operations(self, raw): + return [self.build_operation(op_type="DummyOperation", dummy="parameter")] + + with init_job_directory() as job_directory: + TestAnalyzer(job_directory).report({}) + output = load_output(job_directory) + + assert output == { + **DEFAULT_OUTPUT, + "operations": [{"type": "DummyOperation", "dummy": "parameter"}], + } + + +def test_analyzer_report_with_extractable_artifacts(): + string_ip_artifact = "11.22.33.44" + list_ip_artifacts = ["10.20.30.40", "20.30.40.50"] + dict_item_ip_artifact = "100.100.100.100" + + report = { + "simple-ip": string_ip_artifact, + "list-of-ips": list_ip_artifacts, + "dict-with-ip": {"just-an-ip": dict_item_ip_artifact}, + } + + with init_job_directory() as job_directory: + Analyzer(job_directory).report(report) + output = load_output(job_directory) + + assert output == { + **DEFAULT_OUTPUT, + "artifacts": [ + {"data": ip, "dataType": "ip"} + for ip in [string_ip_artifact, *list_ip_artifacts, dict_item_ip_artifact] + ], + "full": report, + } + + +def test_analyzer_error_for_invalid_input(): + + empty_input = {} + with init_job_directory(empty_input) as job_directory: + with pytest.raises(SystemExit): + Analyzer(job_directory) + output = load_output(job_directory) + + assert output == { + "success": False, + "input": empty_input, + "errorMessage": "Missing dataType field", + } + + generic_input_without_data = {"dataType": "ip"} + with init_job_directory(generic_input_without_data) as job_directory: + with pytest.raises(SystemExit): + Analyzer(job_directory).report({}) + output = load_output(job_directory) + + assert output == { + "success": False, + "input": generic_input_without_data, + "errorMessage": "Missing data field", + } + + file_input_without_filename = {"dataType": "file"} + with init_job_directory(file_input_without_filename) as job_directory: + with pytest.raises(SystemExit): + Analyzer(job_directory).report({}) + output = load_output(job_directory) + + assert output == { + "success": False, + "input": file_input_without_filename, + "errorMessage": "Missing filename.", + } From 6439f94fe528f88af4c187a2a0051b33590d5b49 Mon Sep 17 00:00:00 2001 From: Szabolcs Antal Date: Thu, 11 Jun 2026 18:37:39 +0200 Subject: [PATCH 2/3] #34 - Switch to test job factory fixtures --- tests/test_integration_analyzers.py | 124 +++++++++++++++------------- 1 file changed, 66 insertions(+), 58 deletions(-) diff --git a/tests/test_integration_analyzers.py b/tests/test_integration_analyzers.py index b3438b8..98ee8c2 100644 --- a/tests/test_integration_analyzers.py +++ b/tests/test_integration_analyzers.py @@ -1,7 +1,6 @@ import json -import tempfile -from contextlib import contextmanager from pathlib import Path +from typing import Callable import pytest @@ -21,52 +20,62 @@ } -@contextmanager -def init_job_directory(input_obj: dict | None = None): - """Context manager that yields a temporary job directory path, then cleans up.""" - temp_dir = tempfile.TemporaryDirectory() - job_dir = Path(temp_dir.name) +@pytest.fixture(scope="function") +def job_directory(tmp_path: Path) -> Path: + """Fixture to initialize the actual test job's directory.""" - try: - input_dir = job_dir / "input" - input_dir.mkdir() + job_dir = tmp_path - if input_obj is None: - input_obj = DEFAULT_INPUT + input_dir = job_dir / "input" + input_dir.mkdir() - with open(input_dir / "input.json", "w") as f: + with open(input_dir / "input.json", "w") as f: + json.dump(DEFAULT_INPUT, f) + + output_dir = job_dir / "output" + output_dir.mkdir() + + print(job_dir) + return job_dir + + +@pytest.fixture +def add_job_input(job_directory: Path) -> Callable[[dict], None]: + """Factory fixture to specify a custom input.json for the actual test job.""" + + def _add_job_input(input_obj: dict): + + with open(job_directory / "input" / "input.json", "w") as f: + print(f.name) json.dump(input_obj, f) - output_dir = job_dir / "output" - output_dir.mkdir() + return _add_job_input - yield job_dir - finally: - temp_dir.cleanup() +@pytest.fixture +def load_job_output(job_directory: Path) -> Callable[[], dict]: + """Factory fixture to load the actual test job's output.json.""" + def _load_job_output() -> dict: + with open(job_directory / "output" / "output.json") as output_file: + output = json.load(output_file) + return output -def load_output(job_directory: Path) -> dict: - with open(job_directory / "output" / "output.json") as output_file: - output = json.load(output_file) - return output + return _load_job_output -def test_simple_analyzer_success(): - with init_job_directory() as job_directory: - Analyzer(job_directory).report({}) - output = load_output(job_directory) +def test_simple_analyzer_success(job_directory, load_job_output): + Analyzer(job_directory).report({}) + output = load_job_output() assert output == DEFAULT_OUTPUT -def test_simple_analyzer_error(): - +def test_simple_analyzer_error(job_directory, load_job_output): error_msg = "test analyzer error" - with init_job_directory() as job_directory: - with pytest.raises(SystemExit): - Analyzer(job_directory).error(error_msg) - output = load_output(job_directory) + with pytest.raises(SystemExit): + Analyzer(job_directory).error(error_msg) + output = load_job_output() assert output == { "success": False, "input": DEFAULT_INPUT, @@ -74,7 +83,7 @@ def test_simple_analyzer_error(): } -def test_analyzer_report_with_summary_and_taxonomies(): +def test_analyzer_report_with_summary_and_taxonomies(job_directory, load_job_output): class TestAnalyzer(Analyzer): def summary(self, raw): taxonomies = [] @@ -89,10 +98,9 @@ def summary(self, raw): ) return {"taxonomies": taxonomies, "raw": raw} - with init_job_directory() as job_directory: - full_report = {} - TestAnalyzer(job_directory).report(full_report) - output = load_output(job_directory) + full_report = {} + TestAnalyzer(job_directory).report(full_report) + output = load_job_output() assert output == { **DEFAULT_OUTPUT, @@ -111,14 +119,13 @@ def summary(self, raw): } -def test_analyzer_report_with_operations(): +def test_analyzer_report_with_operations(job_directory, load_job_output): class TestAnalyzer(Analyzer): def operations(self, raw): return [self.build_operation(op_type="DummyOperation", dummy="parameter")] - with init_job_directory() as job_directory: - TestAnalyzer(job_directory).report({}) - output = load_output(job_directory) + TestAnalyzer(job_directory).report({}) + output = load_job_output() assert output == { **DEFAULT_OUTPUT, @@ -126,7 +133,7 @@ def operations(self, raw): } -def test_analyzer_report_with_extractable_artifacts(): +def test_analyzer_report_with_extractable_artifacts(job_directory, load_job_output): string_ip_artifact = "11.22.33.44" list_ip_artifacts = ["10.20.30.40", "20.30.40.50"] dict_item_ip_artifact = "100.100.100.100" @@ -137,9 +144,8 @@ def test_analyzer_report_with_extractable_artifacts(): "dict-with-ip": {"just-an-ip": dict_item_ip_artifact}, } - with init_job_directory() as job_directory: - Analyzer(job_directory).report(report) - output = load_output(job_directory) + Analyzer(job_directory).report(report) + output = load_job_output() assert output == { **DEFAULT_OUTPUT, @@ -151,13 +157,15 @@ def test_analyzer_report_with_extractable_artifacts(): } -def test_analyzer_error_for_invalid_input(): +def test_analyzer_error_for_invalid_input( + job_directory, add_job_input, load_job_output +): empty_input = {} - with init_job_directory(empty_input) as job_directory: - with pytest.raises(SystemExit): - Analyzer(job_directory) - output = load_output(job_directory) + add_job_input(empty_input) + with pytest.raises(SystemExit): + Analyzer(job_directory) + output = load_job_output() assert output == { "success": False, @@ -166,10 +174,10 @@ def test_analyzer_error_for_invalid_input(): } generic_input_without_data = {"dataType": "ip"} - with init_job_directory(generic_input_without_data) as job_directory: - with pytest.raises(SystemExit): - Analyzer(job_directory).report({}) - output = load_output(job_directory) + add_job_input(generic_input_without_data) + with pytest.raises(SystemExit): + Analyzer(job_directory).report({}) + output = load_job_output() assert output == { "success": False, @@ -178,10 +186,10 @@ def test_analyzer_error_for_invalid_input(): } file_input_without_filename = {"dataType": "file"} - with init_job_directory(file_input_without_filename) as job_directory: - with pytest.raises(SystemExit): - Analyzer(job_directory).report({}) - output = load_output(job_directory) + add_job_input(file_input_without_filename) + with pytest.raises(SystemExit): + Analyzer(job_directory).report({}) + output = load_job_output() assert output == { "success": False, From 91aadd96d0c26870f4af061b88413af25b4c795b Mon Sep 17 00:00:00 2001 From: Szabolcs Antal Date: Thu, 11 Jun 2026 19:19:28 +0200 Subject: [PATCH 3/3] #34 - Experiment with fancy test analyzer factory fixture --- ...lyzers.py => test_analyzer_integration.py} | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) rename tests/{test_integration_analyzers.py => test_analyzer_integration.py} (81%) diff --git a/tests/test_integration_analyzers.py b/tests/test_analyzer_integration.py similarity index 81% rename from tests/test_integration_analyzers.py rename to tests/test_analyzer_integration.py index 98ee8c2..4b6c5c4 100644 --- a/tests/test_integration_analyzers.py +++ b/tests/test_analyzer_integration.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import Callable +from typing import Callable, Protocol, Type, TypeVar import pytest @@ -196,3 +196,52 @@ def test_analyzer_error_for_invalid_input( "input": file_input_without_filename, "errorMessage": "Missing filename.", } + + +# Experimental test analyzer factory + +T = TypeVar("T", bound=Analyzer) + + +class TestAnalyzerFactory(Protocol): + __test__ = False # ignore for pytest + + def __call__(self, analyzer_cls: Type[T] = Analyzer) -> T: ... + + +@pytest.fixture +def get_test_analyzer(job_directory: Path) -> TestAnalyzerFactory: + + def _get_analyzer(analyzer_cls: Type[T] = Analyzer) -> T: + return analyzer_cls(job_directory) + + return _get_analyzer + + +def test_fancy_analyzer_test_factory(get_test_analyzer, load_job_output): + analyzer = get_test_analyzer() + + analyzer.report({}) + + assert type(analyzer) is Analyzer + + output = load_job_output() + assert output == DEFAULT_OUTPUT + + +def test_fancy_custom_analyzer_test_factory(get_test_analyzer, load_job_output): + class TestAnalyzer(Analyzer): + def report(self, full_report, ensure_ascii=False): + return super().report(full_report, ensure_ascii) + + def something_custom(self): + return "..." + + custom_analyzer = get_test_analyzer(TestAnalyzer) + + custom_analyzer.report({}) + + assert type(custom_analyzer) is TestAnalyzer + + output = load_job_output() + assert output == DEFAULT_OUTPUT