From 7988cb65458a6a40651bc03e25f387de68f3b053 Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 6 Feb 2026 15:13:21 +0100 Subject: [PATCH 1/4] Add integration tests for file objects These tests won't pass yet, as we need the upcoming Infrahub 1.8 to run them, hence marking them as XFAIL for now. --- infrahub_sdk/testing/schemas/file_object.py | 108 +++++++ tests/integration/test_file_object.py | 340 ++++++++++++++++++++ 2 files changed, 448 insertions(+) create mode 100644 infrahub_sdk/testing/schemas/file_object.py create mode 100644 tests/integration/test_file_object.py diff --git a/infrahub_sdk/testing/schemas/file_object.py b/infrahub_sdk/testing/schemas/file_object.py new file mode 100644 index 00000000..b0452035 --- /dev/null +++ b/infrahub_sdk/testing/schemas/file_object.py @@ -0,0 +1,108 @@ +import pytest + +from infrahub_sdk import InfrahubClient, InfrahubClientSync +from infrahub_sdk.exceptions import GraphQLError +from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync +from infrahub_sdk.schema.main import AttributeKind, NodeSchema, RelationshipKind, SchemaRoot +from infrahub_sdk.schema.main import AttributeSchema as Attr +from infrahub_sdk.schema.main import RelationshipSchema as Rel + +NAMESPACE = "Testing" +TESTING_FILE_CONTRACT = f"{NAMESPACE}FileContract" +TESTING_CIRCUIT = f"{NAMESPACE}Circuit" + +PDF_MAGIC_BYTES = b"%PDF-1.4 fake pdf content for testing" +PNG_MAGIC_BYTES = b"\x89PNG\r\n\x1a\n fake png content for testing" +TEXT_CONTENT = b"This is a simple text file content for testing purposes." + + +class SchemaFileObject: + @pytest.fixture(scope="class") + def schema_file_contract(self) -> NodeSchema: + return NodeSchema( + name="FileContract", + namespace=NAMESPACE, + include_in_menu=True, + inherit_from=["CoreFileObject"], + display_label="file_name__value", + human_friendly_id=["contract_ref__value"], + order_by=["contract_ref__value"], + attributes=[ + Attr(name="contract_ref", kind=AttributeKind.TEXT, unique=True), + Attr(name="description", kind=AttributeKind.TEXT, optional=True), + Attr(name="active", kind=AttributeKind.BOOLEAN, default_value=True, optional=True), + ], + relationships=[ + Rel( + name="circuit", + kind=RelationshipKind.ATTRIBUTE, + optional=True, + peer=TESTING_CIRCUIT, + cardinality="one", + identifier="circuit__contracts", + ), + ], + ) + + @pytest.fixture(scope="class") + def schema_circuit(self) -> NodeSchema: + return NodeSchema( + name="Circuit", + namespace=NAMESPACE, + include_in_menu=True, + display_label="circuit_id__value", + human_friendly_id=["circuit_id__value"], + order_by=["circuit_id__value"], + attributes=[ + Attr(name="circuit_id", kind=AttributeKind.TEXT, unique=True), + Attr(name="bandwidth", kind=AttributeKind.NUMBER, optional=True), + ], + relationships=[ + Rel( + name="contracts", + kind=RelationshipKind.GENERIC, + optional=True, + peer=TESTING_FILE_CONTRACT, + cardinality="many", + identifier="circuit__contracts", + ), + ], + ) + + @pytest.fixture(scope="class") + def schema_file_object_base(self, schema_file_contract: NodeSchema, schema_circuit: NodeSchema) -> SchemaRoot: + return SchemaRoot(version="1.0", nodes=[schema_file_contract, schema_circuit]) + + @pytest.fixture(scope="class") + async def load_file_object_schema(self, client: InfrahubClient, schema_file_object_base: SchemaRoot) -> None: + resp = await client.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True) + if resp.errors: + raise GraphQLError(errors=[resp.errors]) + + @pytest.fixture(scope="class") + def load_file_object_schema_sync( + self, client_sync: InfrahubClientSync, schema_file_object_base: SchemaRoot + ) -> None: + resp = client_sync.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True) + if resp.errors: + raise GraphQLError(errors=[resp.errors]) + + @pytest.fixture(scope="class") + async def circuit_main( + self, + client: InfrahubClient, + load_file_object_schema: None, # noqa: ARG002 + ) -> InfrahubNode: + obj = await client.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-001", bandwidth=1000) + await obj.save() + return obj + + @pytest.fixture(scope="class") + def circuit_main_sync( + self, + client_sync: InfrahubClientSync, + load_file_object_schema_sync: None, # noqa: ARG002 + ) -> InfrahubNodeSync: + obj = client_sync.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-SYNC-001", bandwidth=2000) + obj.save() + return obj diff --git a/tests/integration/test_file_object.py b/tests/integration/test_file_object.py new file mode 100644 index 00000000..c756eca4 --- /dev/null +++ b/tests/integration/test_file_object.py @@ -0,0 +1,340 @@ +from __future__ import annotations + +import hashlib +import tempfile +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk.testing.docker import TestInfrahubDockerClient +from infrahub_sdk.testing.schemas.file_object import ( + PDF_MAGIC_BYTES, + PNG_MAGIC_BYTES, + TESTING_CIRCUIT, + TESTING_FILE_CONTRACT, + TEXT_CONTENT, + SchemaFileObject, +) + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient, InfrahubClientSync + from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync + from infrahub_sdk.node.relationship import RelationshipManager, RelationshipManagerSync + + +@pytest.mark.xfail(reason="Requires Infrahub 1.8+") +class TestFileObjectAsync(TestInfrahubDockerClient, SchemaFileObject): + """Async integration tests for FileObject functionality.""" + + async def test_create_file_object_with_upload( + self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode + ) -> None: + """Test creating FileObject nodes with both upload_from_bytes and upload_from_path.""" + contract_bytes = await client.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-CREATE-BYTES-001", + description="Test contract with bytes upload", + circuit=circuit_main, + ) + contract_bytes.upload_from_bytes(content=PDF_MAGIC_BYTES, name="contract.pdf") + await contract_bytes.save() + + fetched = await client.get(kind=TESTING_FILE_CONTRACT, id=contract_bytes.id) + assert fetched.contract_ref.value == "CONTRACT-CREATE-BYTES-001" + assert fetched.file_name.value == "contract.pdf" + assert fetched.file_size.value == len(PDF_MAGIC_BYTES) + assert fetched.checksum.value == hashlib.sha1(PDF_MAGIC_BYTES, usedforsecurity=False).hexdigest() + assert fetched.storage_id.value + + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) / "upload_test.txt" + tmp_path.write_bytes(TEXT_CONTENT) + + contract_path = await client.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-CREATE-PATH-001", + description="Test contract from path", + circuit=circuit_main, + ) + contract_path.upload_from_path(path=tmp_path) + await contract_path.save() + + fetched = await client.get(kind=TESTING_FILE_CONTRACT, id=contract_path.id) + assert fetched.file_name.value == tmp_path.name + assert fetched.file_size.value == len(TEXT_CONTENT) + assert fetched.checksum.value == hashlib.sha1(TEXT_CONTENT, usedforsecurity=False).hexdigest() + assert fetched.storage_id.value + + async def test_update_file_object_with_new_file( + self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode + ) -> None: + """Test updating a FileObject node with a new file.""" + contract = await client.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-UPDATE-001", + description="Initial contract", + circuit=circuit_main, + ) + contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="initial.pdf") + await contract.save() + + contract_to_update = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + contract_to_update.description.value = "Updated contract" + contract_to_update.upload_from_bytes(content=PNG_MAGIC_BYTES, name="updated.png") + await contract_to_update.save() + + updated = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + assert updated.description.value == "Updated contract" + assert updated.file_name.value == "updated.png" + assert updated.storage_id.value != contract.storage_id.value + assert updated.checksum.value != contract.checksum.value + + async def test_upsert_file_object_update( + self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode + ) -> None: + """Test upserting an existing FileObject node updates it rather than creating a duplicate.""" + contract = await client.create( + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-001", description="Original", circuit=circuit_main + ) + contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="original.pdf") + await contract.save() + + contract_upsert = await client.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-UPSERT-001", + description="Upserted update", + circuit=circuit_main, + ) + contract_upsert.upload_from_bytes(content=PNG_MAGIC_BYTES, name="upserted.png") + await contract_upsert.save(allow_upsert=True) + assert contract_upsert.id == contract.id + + updated = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + assert updated.description.value == "Upserted update" + assert updated.file_name.value == "upserted.png" + assert updated.storage_id.value != contract.storage_id.value + + async def test_download_file( + self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode + ) -> None: + """Test downloading files to memory and to disk.""" + contract = await client.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-DOWNLOAD-001", + description="Download test", + circuit=circuit_main, + ) + contract.upload_from_bytes(content=TEXT_CONTENT, name="download_test.txt") + await contract.save() + + fetched = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + downloaded_content = await fetched.download_file() + assert downloaded_content == TEXT_CONTENT + + with tempfile.TemporaryDirectory() as tmpdir: + dest_path = Path(tmpdir) / "downloaded.txt" + bytes_written = await fetched.download_file(dest=dest_path) + assert bytes_written == len(TEXT_CONTENT) + assert dest_path.read_bytes() == TEXT_CONTENT + + async def test_update_without_file_change( + self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode + ) -> None: + """Test updating FileObject attributes without replacing the file.""" + contract = await client.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-META-001", + description="Original description", + circuit=circuit_main, + ) + contract.upload_from_bytes(content=TEXT_CONTENT, name="unchanged.txt") + await contract.save() + + contract_to_update = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + contract_to_update.description.value = "Updated description" + await contract_to_update.save() + + updated = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id) + assert updated.description.value == "Updated description" + assert updated.storage_id.value == contract_to_update.storage_id.value + assert updated.checksum.value == contract_to_update.checksum.value + + async def test_query_contracts_through_circuit_relationship( + self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode + ) -> None: + """Test that FileObject contracts can be queried through their circuit relationship.""" + for i in range(2): + contract = await client.create( + kind=TESTING_FILE_CONTRACT, contract_ref=f"CONTRACT-REL-{i:03d}", circuit=circuit_main + ) + contract.upload_from_bytes(content=TEXT_CONTENT, name=f"contract_{i}.txt") + await contract.save() + + circuit = await client.get(kind=TESTING_CIRCUIT, id=circuit_main.id, include=["contracts"]) + contracts: RelationshipManager = circuit.contracts + contract_refs = {contract.peer.contract_ref.value for contract in contracts} + + assert len(contract_refs) >= 2 + assert "CONTRACT-REL-000" in contract_refs + assert "CONTRACT-REL-001" in contract_refs + + +@pytest.mark.xfail(reason="Requires Infrahub 1.8+") +class TestFileObjectSync(TestInfrahubDockerClient, SchemaFileObject): + """Sync integration tests for FileObject functionality.""" + + def test_create_file_object_with_upload_sync( + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + ) -> None: + """Test creating FileObject nodes with both upload_from_bytes and upload_from_path (sync).""" + contract_bytes = client_sync.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-CREATE-BYTES-SYNC-001", + description="Test contract with bytes upload (sync)", + circuit=circuit_main_sync, + ) + contract_bytes.upload_from_bytes(content=PDF_MAGIC_BYTES, name="contract_sync.pdf") + contract_bytes.save() + + fetched = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract_bytes.id) + assert fetched.contract_ref.value == "CONTRACT-CREATE-BYTES-SYNC-001" + assert fetched.file_name.value == "contract_sync.pdf" + assert fetched.file_size.value == len(PDF_MAGIC_BYTES) + assert fetched.checksum.value == hashlib.sha1(PDF_MAGIC_BYTES, usedforsecurity=False).hexdigest() + assert fetched.storage_id.value + + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) / "upload_test_sync.txt" + tmp_path.write_bytes(TEXT_CONTENT) + + contract_path = client_sync.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-CREATE-PATH-SYNC-001", + description="Test contract from path (sync)", + circuit=circuit_main_sync, + ) + contract_path.upload_from_path(path=tmp_path) + contract_path.save() + + fetched = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract_path.id) + assert fetched.file_name.value == tmp_path.name + assert fetched.file_size.value == len(TEXT_CONTENT) + assert fetched.checksum.value == hashlib.sha1(TEXT_CONTENT, usedforsecurity=False).hexdigest() + assert fetched.storage_id.value + + def test_update_file_object_with_new_file_sync( + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + ) -> None: + """Test updating a FileObject node with a new file (sync).""" + contract = client_sync.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-UPDATE-SYNC-001", + description="Initial contract sync", + circuit=circuit_main_sync, + ) + contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="initial_sync.pdf") + contract.save() + + contract_to_update = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + contract_to_update.description.value = "Updated contract sync" + contract_to_update.upload_from_bytes(content=PNG_MAGIC_BYTES, name="updated_sync.png") + contract_to_update.save() + + updated = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + assert updated.description.value == "Updated contract sync" + assert updated.file_name.value == "updated_sync.png" + assert updated.storage_id.value != contract.storage_id.value + assert updated.checksum.value != contract.checksum.value + + def test_upsert_file_object_update_sync( + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + ) -> None: + """Test upserting an existing FileObject node updates it rather than creating a duplicate (sync).""" + contract = client_sync.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-UPSERT-SYNC-001", + description="Original sync", + circuit=circuit_main_sync, + ) + contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="original_sync.pdf") + contract.save() + + contract_upsert = client_sync.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-UPSERT-SYNC-001", + description="Upserted update sync", + circuit=circuit_main_sync, + ) + contract_upsert.upload_from_bytes(content=PNG_MAGIC_BYTES, name="upserted_sync.png") + contract_upsert.save(allow_upsert=True) + assert contract_upsert.id == contract.id + + updated = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + assert updated.description.value == "Upserted update sync" + assert updated.file_name.value == "upserted_sync.png" + assert updated.storage_id.value != contract.storage_id.value + + def test_download_file_sync( + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + ) -> None: + """Test downloading files to memory and to disk (sync).""" + contract = client_sync.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-DOWNLOAD-SYNC-001", + description="Download test sync", + circuit=circuit_main_sync, + ) + contract.upload_from_bytes(content=TEXT_CONTENT, name="download_sync.txt") + contract.save() + + fetched = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + downloaded_content = fetched.download_file() + assert downloaded_content == TEXT_CONTENT + + with tempfile.TemporaryDirectory() as tmpdir: + dest_path = Path(tmpdir) / "downloaded_sync.txt" + bytes_written = fetched.download_file(dest=dest_path) + assert bytes_written == len(TEXT_CONTENT) + assert dest_path.read_bytes() == TEXT_CONTENT + + def test_update_without_file_change_sync( + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + ) -> None: + """Test updating FileObject attributes without replacing the file (sync).""" + contract = client_sync.create( + kind=TESTING_FILE_CONTRACT, + contract_ref="CONTRACT-META-SYNC-001", + description="Original description sync", + circuit=circuit_main_sync, + ) + contract.upload_from_bytes(content=TEXT_CONTENT, name="unchanged_sync.txt") + contract.save() + + contract_to_update = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False) + contract_to_update.description.value = "Updated description sync" + contract_to_update.save() + + updated = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id) + assert updated.description.value == "Updated description sync" + assert updated.storage_id.value == contract_to_update.storage_id.value + assert updated.checksum.value == contract_to_update.checksum.value + + def test_query_contracts_through_circuit_relationship_sync( + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + ) -> None: + """Test that FileObject contracts can be queried through their circuit relationship (sync).""" + for i in range(2): + contract = client_sync.create( + kind=TESTING_FILE_CONTRACT, contract_ref=f"CONTRACT-REL-SYNC-{i:03d}", circuit=circuit_main_sync + ) + contract.upload_from_bytes(content=TEXT_CONTENT, name=f"contract_sync_{i}.txt") + contract.save() + + circuit = client_sync.get(kind=TESTING_CIRCUIT, id=circuit_main_sync.id, include=["contracts"]) + contracts: RelationshipManagerSync = circuit.contracts + contract_refs = {contract.peer.contract_ref.value for contract in contracts} + + assert len(contract_refs) >= 2 + assert "CONTRACT-REL-SYNC-000" in contract_refs + assert "CONTRACT-REL-SYNC-001" in contract_refs From 7fc52505878dfc87295bc759d68c4e844e43dc3f Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 6 Feb 2026 16:16:57 +0100 Subject: [PATCH 2/4] This is not really needed in tests --- infrahub_sdk/testing/schemas/file_object.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/infrahub_sdk/testing/schemas/file_object.py b/infrahub_sdk/testing/schemas/file_object.py index b0452035..81caae8a 100644 --- a/infrahub_sdk/testing/schemas/file_object.py +++ b/infrahub_sdk/testing/schemas/file_object.py @@ -1,7 +1,6 @@ import pytest from infrahub_sdk import InfrahubClient, InfrahubClientSync -from infrahub_sdk.exceptions import GraphQLError from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync from infrahub_sdk.schema.main import AttributeKind, NodeSchema, RelationshipKind, SchemaRoot from infrahub_sdk.schema.main import AttributeSchema as Attr @@ -75,17 +74,13 @@ def schema_file_object_base(self, schema_file_contract: NodeSchema, schema_circu @pytest.fixture(scope="class") async def load_file_object_schema(self, client: InfrahubClient, schema_file_object_base: SchemaRoot) -> None: - resp = await client.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True) - if resp.errors: - raise GraphQLError(errors=[resp.errors]) + await client.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True) @pytest.fixture(scope="class") def load_file_object_schema_sync( self, client_sync: InfrahubClientSync, schema_file_object_base: SchemaRoot ) -> None: - resp = client_sync.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True) - if resp.errors: - raise GraphQLError(errors=[resp.errors]) + client_sync.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True) @pytest.fixture(scope="class") async def circuit_main( From af41cb50dfd851862cce8a2b49b36cd2f609f69d Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Tue, 10 Feb 2026 15:01:56 +0100 Subject: [PATCH 3/4] Move fixtures to tests --- infrahub_sdk/testing/schemas/file_object.py | 21 --------------------- tests/integration/test_file_object.py | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/infrahub_sdk/testing/schemas/file_object.py b/infrahub_sdk/testing/schemas/file_object.py index 81caae8a..a73403ec 100644 --- a/infrahub_sdk/testing/schemas/file_object.py +++ b/infrahub_sdk/testing/schemas/file_object.py @@ -1,7 +1,6 @@ import pytest from infrahub_sdk import InfrahubClient, InfrahubClientSync -from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync from infrahub_sdk.schema.main import AttributeKind, NodeSchema, RelationshipKind, SchemaRoot from infrahub_sdk.schema.main import AttributeSchema as Attr from infrahub_sdk.schema.main import RelationshipSchema as Rel @@ -81,23 +80,3 @@ def load_file_object_schema_sync( self, client_sync: InfrahubClientSync, schema_file_object_base: SchemaRoot ) -> None: client_sync.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True) - - @pytest.fixture(scope="class") - async def circuit_main( - self, - client: InfrahubClient, - load_file_object_schema: None, # noqa: ARG002 - ) -> InfrahubNode: - obj = await client.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-001", bandwidth=1000) - await obj.save() - return obj - - @pytest.fixture(scope="class") - def circuit_main_sync( - self, - client_sync: InfrahubClientSync, - load_file_object_schema_sync: None, # noqa: ARG002 - ) -> InfrahubNodeSync: - obj = client_sync.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-SYNC-001", bandwidth=2000) - obj.save() - return obj diff --git a/tests/integration/test_file_object.py b/tests/integration/test_file_object.py index c756eca4..9154dd3e 100644 --- a/tests/integration/test_file_object.py +++ b/tests/integration/test_file_object.py @@ -27,6 +27,12 @@ class TestFileObjectAsync(TestInfrahubDockerClient, SchemaFileObject): """Async integration tests for FileObject functionality.""" + @pytest.fixture(scope="class") + async def circuit_main(self, client: InfrahubClient, load_file_object_schema: None) -> InfrahubNode: + obj = await client.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-001", bandwidth=1000) + await obj.save() + return obj + async def test_create_file_object_with_upload( self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode ) -> None: @@ -184,6 +190,14 @@ async def test_query_contracts_through_circuit_relationship( class TestFileObjectSync(TestInfrahubDockerClient, SchemaFileObject): """Sync integration tests for FileObject functionality.""" + @pytest.fixture(scope="class") + def circuit_main_sync( + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None + ) -> InfrahubNodeSync: + obj = client_sync.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-SYNC-001", bandwidth=2000) + obj.save() + return obj + def test_create_file_object_with_upload_sync( self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync ) -> None: From 5d9bad8c80e809844f08d2c12e1869d926ceaae9 Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Tue, 10 Feb 2026 15:20:05 +0100 Subject: [PATCH 4/4] Remove things tested in backend integration tests --- infrahub_sdk/testing/schemas/file_object.py | 43 +------ tests/integration/test_file_object.py | 132 +++----------------- 2 files changed, 22 insertions(+), 153 deletions(-) diff --git a/infrahub_sdk/testing/schemas/file_object.py b/infrahub_sdk/testing/schemas/file_object.py index a73403ec..dc79b214 100644 --- a/infrahub_sdk/testing/schemas/file_object.py +++ b/infrahub_sdk/testing/schemas/file_object.py @@ -1,13 +1,11 @@ import pytest from infrahub_sdk import InfrahubClient, InfrahubClientSync -from infrahub_sdk.schema.main import AttributeKind, NodeSchema, RelationshipKind, SchemaRoot +from infrahub_sdk.schema.main import AttributeKind, NodeSchema, SchemaRoot from infrahub_sdk.schema.main import AttributeSchema as Attr -from infrahub_sdk.schema.main import RelationshipSchema as Rel NAMESPACE = "Testing" TESTING_FILE_CONTRACT = f"{NAMESPACE}FileContract" -TESTING_CIRCUIT = f"{NAMESPACE}Circuit" PDF_MAGIC_BYTES = b"%PDF-1.4 fake pdf content for testing" PNG_MAGIC_BYTES = b"\x89PNG\r\n\x1a\n fake png content for testing" @@ -30,46 +28,11 @@ def schema_file_contract(self) -> NodeSchema: Attr(name="description", kind=AttributeKind.TEXT, optional=True), Attr(name="active", kind=AttributeKind.BOOLEAN, default_value=True, optional=True), ], - relationships=[ - Rel( - name="circuit", - kind=RelationshipKind.ATTRIBUTE, - optional=True, - peer=TESTING_CIRCUIT, - cardinality="one", - identifier="circuit__contracts", - ), - ], - ) - - @pytest.fixture(scope="class") - def schema_circuit(self) -> NodeSchema: - return NodeSchema( - name="Circuit", - namespace=NAMESPACE, - include_in_menu=True, - display_label="circuit_id__value", - human_friendly_id=["circuit_id__value"], - order_by=["circuit_id__value"], - attributes=[ - Attr(name="circuit_id", kind=AttributeKind.TEXT, unique=True), - Attr(name="bandwidth", kind=AttributeKind.NUMBER, optional=True), - ], - relationships=[ - Rel( - name="contracts", - kind=RelationshipKind.GENERIC, - optional=True, - peer=TESTING_FILE_CONTRACT, - cardinality="many", - identifier="circuit__contracts", - ), - ], ) @pytest.fixture(scope="class") - def schema_file_object_base(self, schema_file_contract: NodeSchema, schema_circuit: NodeSchema) -> SchemaRoot: - return SchemaRoot(version="1.0", nodes=[schema_file_contract, schema_circuit]) + def schema_file_object_base(self, schema_file_contract: NodeSchema) -> SchemaRoot: + return SchemaRoot(version="1.0", nodes=[schema_file_contract]) @pytest.fixture(scope="class") async def load_file_object_schema(self, client: InfrahubClient, schema_file_object_base: SchemaRoot) -> None: diff --git a/tests/integration/test_file_object.py b/tests/integration/test_file_object.py index 9154dd3e..49dd7421 100644 --- a/tests/integration/test_file_object.py +++ b/tests/integration/test_file_object.py @@ -11,7 +11,6 @@ from infrahub_sdk.testing.schemas.file_object import ( PDF_MAGIC_BYTES, PNG_MAGIC_BYTES, - TESTING_CIRCUIT, TESTING_FILE_CONTRACT, TEXT_CONTENT, SchemaFileObject, @@ -19,29 +18,18 @@ if TYPE_CHECKING: from infrahub_sdk import InfrahubClient, InfrahubClientSync - from infrahub_sdk.node import InfrahubNode, InfrahubNodeSync - from infrahub_sdk.node.relationship import RelationshipManager, RelationshipManagerSync @pytest.mark.xfail(reason="Requires Infrahub 1.8+") class TestFileObjectAsync(TestInfrahubDockerClient, SchemaFileObject): """Async integration tests for FileObject functionality.""" - @pytest.fixture(scope="class") - async def circuit_main(self, client: InfrahubClient, load_file_object_schema: None) -> InfrahubNode: - obj = await client.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-001", bandwidth=1000) - await obj.save() - return obj - - async def test_create_file_object_with_upload( - self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode - ) -> None: + async def test_create_file_object_with_upload(self, client: InfrahubClient, load_file_object_schema: None) -> None: """Test creating FileObject nodes with both upload_from_bytes and upload_from_path.""" contract_bytes = await client.create( kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-CREATE-BYTES-001", description="Test contract with bytes upload", - circuit=circuit_main, ) contract_bytes.upload_from_bytes(content=PDF_MAGIC_BYTES, name="contract.pdf") await contract_bytes.save() @@ -61,7 +49,6 @@ async def test_create_file_object_with_upload( kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-CREATE-PATH-001", description="Test contract from path", - circuit=circuit_main, ) contract_path.upload_from_path(path=tmp_path) await contract_path.save() @@ -73,14 +60,11 @@ async def test_create_file_object_with_upload( assert fetched.storage_id.value async def test_update_file_object_with_new_file( - self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode + self, client: InfrahubClient, load_file_object_schema: None ) -> None: """Test updating a FileObject node with a new file.""" contract = await client.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-UPDATE-001", - description="Initial contract", - circuit=circuit_main, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPDATE-001", description="Initial contract" ) contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="initial.pdf") await contract.save() @@ -96,12 +80,10 @@ async def test_update_file_object_with_new_file( assert updated.storage_id.value != contract.storage_id.value assert updated.checksum.value != contract.checksum.value - async def test_upsert_file_object_update( - self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode - ) -> None: + async def test_upsert_file_object_update(self, client: InfrahubClient, load_file_object_schema: None) -> None: """Test upserting an existing FileObject node updates it rather than creating a duplicate.""" contract = await client.create( - kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-001", description="Original", circuit=circuit_main + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-001", description="Original" ) contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="original.pdf") await contract.save() @@ -110,7 +92,6 @@ async def test_upsert_file_object_update( kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-001", description="Upserted update", - circuit=circuit_main, ) contract_upsert.upload_from_bytes(content=PNG_MAGIC_BYTES, name="upserted.png") await contract_upsert.save(allow_upsert=True) @@ -121,15 +102,10 @@ async def test_upsert_file_object_update( assert updated.file_name.value == "upserted.png" assert updated.storage_id.value != contract.storage_id.value - async def test_download_file( - self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode - ) -> None: + async def test_download_file(self, client: InfrahubClient, load_file_object_schema: None) -> None: """Test downloading files to memory and to disk.""" contract = await client.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-DOWNLOAD-001", - description="Download test", - circuit=circuit_main, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-DOWNLOAD-001", description="Download test" ) contract.upload_from_bytes(content=TEXT_CONTENT, name="download_test.txt") await contract.save() @@ -144,15 +120,10 @@ async def test_download_file( assert bytes_written == len(TEXT_CONTENT) assert dest_path.read_bytes() == TEXT_CONTENT - async def test_update_without_file_change( - self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode - ) -> None: + async def test_update_without_file_change(self, client: InfrahubClient, load_file_object_schema: None) -> None: """Test updating FileObject attributes without replacing the file.""" contract = await client.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-META-001", - description="Original description", - circuit=circuit_main, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-META-001", description="Original description" ) contract.upload_from_bytes(content=TEXT_CONTENT, name="unchanged.txt") await contract.save() @@ -166,47 +137,19 @@ async def test_update_without_file_change( assert updated.storage_id.value == contract_to_update.storage_id.value assert updated.checksum.value == contract_to_update.checksum.value - async def test_query_contracts_through_circuit_relationship( - self, client: InfrahubClient, load_file_object_schema: None, circuit_main: InfrahubNode - ) -> None: - """Test that FileObject contracts can be queried through their circuit relationship.""" - for i in range(2): - contract = await client.create( - kind=TESTING_FILE_CONTRACT, contract_ref=f"CONTRACT-REL-{i:03d}", circuit=circuit_main - ) - contract.upload_from_bytes(content=TEXT_CONTENT, name=f"contract_{i}.txt") - await contract.save() - - circuit = await client.get(kind=TESTING_CIRCUIT, id=circuit_main.id, include=["contracts"]) - contracts: RelationshipManager = circuit.contracts - contract_refs = {contract.peer.contract_ref.value for contract in contracts} - - assert len(contract_refs) >= 2 - assert "CONTRACT-REL-000" in contract_refs - assert "CONTRACT-REL-001" in contract_refs - @pytest.mark.xfail(reason="Requires Infrahub 1.8+") class TestFileObjectSync(TestInfrahubDockerClient, SchemaFileObject): """Sync integration tests for FileObject functionality.""" - @pytest.fixture(scope="class") - def circuit_main_sync( - self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None - ) -> InfrahubNodeSync: - obj = client_sync.create(kind=TESTING_CIRCUIT, circuit_id="CIRCUIT-SYNC-001", bandwidth=2000) - obj.save() - return obj - def test_create_file_object_with_upload_sync( - self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None ) -> None: """Test creating FileObject nodes with both upload_from_bytes and upload_from_path (sync).""" contract_bytes = client_sync.create( kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-CREATE-BYTES-SYNC-001", description="Test contract with bytes upload (sync)", - circuit=circuit_main_sync, ) contract_bytes.upload_from_bytes(content=PDF_MAGIC_BYTES, name="contract_sync.pdf") contract_bytes.save() @@ -226,7 +169,6 @@ def test_create_file_object_with_upload_sync( kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-CREATE-PATH-SYNC-001", description="Test contract from path (sync)", - circuit=circuit_main_sync, ) contract_path.upload_from_path(path=tmp_path) contract_path.save() @@ -238,14 +180,11 @@ def test_create_file_object_with_upload_sync( assert fetched.storage_id.value def test_update_file_object_with_new_file_sync( - self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None ) -> None: """Test updating a FileObject node with a new file (sync).""" contract = client_sync.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-UPDATE-SYNC-001", - description="Initial contract sync", - circuit=circuit_main_sync, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPDATE-SYNC-001", description="Initial contract sync" ) contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="initial_sync.pdf") contract.save() @@ -262,23 +201,17 @@ def test_update_file_object_with_new_file_sync( assert updated.checksum.value != contract.checksum.value def test_upsert_file_object_update_sync( - self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None ) -> None: """Test upserting an existing FileObject node updates it rather than creating a duplicate (sync).""" contract = client_sync.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-UPSERT-SYNC-001", - description="Original sync", - circuit=circuit_main_sync, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-SYNC-001", description="Original sync" ) contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="original_sync.pdf") contract.save() contract_upsert = client_sync.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-UPSERT-SYNC-001", - description="Upserted update sync", - circuit=circuit_main_sync, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-SYNC-001", description="Upserted update sync" ) contract_upsert.upload_from_bytes(content=PNG_MAGIC_BYTES, name="upserted_sync.png") contract_upsert.save(allow_upsert=True) @@ -289,15 +222,10 @@ def test_upsert_file_object_update_sync( assert updated.file_name.value == "upserted_sync.png" assert updated.storage_id.value != contract.storage_id.value - def test_download_file_sync( - self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync - ) -> None: + def test_download_file_sync(self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None) -> None: """Test downloading files to memory and to disk (sync).""" contract = client_sync.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-DOWNLOAD-SYNC-001", - description="Download test sync", - circuit=circuit_main_sync, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-DOWNLOAD-SYNC-001", description="Download test sync" ) contract.upload_from_bytes(content=TEXT_CONTENT, name="download_sync.txt") contract.save() @@ -313,14 +241,11 @@ def test_download_file_sync( assert dest_path.read_bytes() == TEXT_CONTENT def test_update_without_file_change_sync( - self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync + self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None ) -> None: """Test updating FileObject attributes without replacing the file (sync).""" contract = client_sync.create( - kind=TESTING_FILE_CONTRACT, - contract_ref="CONTRACT-META-SYNC-001", - description="Original description sync", - circuit=circuit_main_sync, + kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-META-SYNC-001", description="Original description sync" ) contract.upload_from_bytes(content=TEXT_CONTENT, name="unchanged_sync.txt") contract.save() @@ -333,22 +258,3 @@ def test_update_without_file_change_sync( assert updated.description.value == "Updated description sync" assert updated.storage_id.value == contract_to_update.storage_id.value assert updated.checksum.value == contract_to_update.checksum.value - - def test_query_contracts_through_circuit_relationship_sync( - self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None, circuit_main_sync: InfrahubNodeSync - ) -> None: - """Test that FileObject contracts can be queried through their circuit relationship (sync).""" - for i in range(2): - contract = client_sync.create( - kind=TESTING_FILE_CONTRACT, contract_ref=f"CONTRACT-REL-SYNC-{i:03d}", circuit=circuit_main_sync - ) - contract.upload_from_bytes(content=TEXT_CONTENT, name=f"contract_sync_{i}.txt") - contract.save() - - circuit = client_sync.get(kind=TESTING_CIRCUIT, id=circuit_main_sync.id, include=["contracts"]) - contracts: RelationshipManagerSync = circuit.contracts - contract_refs = {contract.peer.contract_ref.value for contract in contracts} - - assert len(contract_refs) >= 2 - assert "CONTRACT-REL-SYNC-000" in contract_refs - assert "CONTRACT-REL-SYNC-001" in contract_refs