Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions infrahub_sdk/testing/schemas/file_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import pytest

from infrahub_sdk import InfrahubClient, InfrahubClientSync
from infrahub_sdk.schema.main import AttributeKind, NodeSchema, SchemaRoot
from infrahub_sdk.schema.main import AttributeSchema as Attr

NAMESPACE = "Testing"
TESTING_FILE_CONTRACT = f"{NAMESPACE}FileContract"

PDF_MAGIC_BYTES = b"%PDF-1.4 fake pdf content for testing"
PNG_MAGIC_BYTES = b"\x89PNG\r\n\x1a\n fake png content for testing"
TEXT_CONTENT = b"This is a simple text file content for testing purposes."


class SchemaFileObject:
@pytest.fixture(scope="class")
def schema_file_contract(self) -> NodeSchema:
return NodeSchema(
name="FileContract",
namespace=NAMESPACE,
include_in_menu=True,
inherit_from=["CoreFileObject"],
display_label="file_name__value",
human_friendly_id=["contract_ref__value"],
order_by=["contract_ref__value"],
attributes=[
Attr(name="contract_ref", kind=AttributeKind.TEXT, unique=True),
Attr(name="description", kind=AttributeKind.TEXT, optional=True),
Attr(name="active", kind=AttributeKind.BOOLEAN, default_value=True, optional=True),
],
)

@pytest.fixture(scope="class")
def schema_file_object_base(self, schema_file_contract: NodeSchema) -> SchemaRoot:
return SchemaRoot(version="1.0", nodes=[schema_file_contract])

@pytest.fixture(scope="class")
async def load_file_object_schema(self, client: InfrahubClient, schema_file_object_base: SchemaRoot) -> None:
await client.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True)

@pytest.fixture(scope="class")
def load_file_object_schema_sync(
self, client_sync: InfrahubClientSync, schema_file_object_base: SchemaRoot
) -> None:
client_sync.schema.load(schemas=[schema_file_object_base.to_schema_dict()], wait_until_converged=True)
260 changes: 260 additions & 0 deletions tests/integration/test_file_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
from __future__ import annotations

import hashlib
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING

import pytest

from infrahub_sdk.testing.docker import TestInfrahubDockerClient
from infrahub_sdk.testing.schemas.file_object import (
PDF_MAGIC_BYTES,
PNG_MAGIC_BYTES,
TESTING_FILE_CONTRACT,
TEXT_CONTENT,
SchemaFileObject,
)

if TYPE_CHECKING:
from infrahub_sdk import InfrahubClient, InfrahubClientSync


@pytest.mark.xfail(reason="Requires Infrahub 1.8+")
class TestFileObjectAsync(TestInfrahubDockerClient, SchemaFileObject):
"""Async integration tests for FileObject functionality."""

async def test_create_file_object_with_upload(self, client: InfrahubClient, load_file_object_schema: None) -> None:
"""Test creating FileObject nodes with both upload_from_bytes and upload_from_path."""
contract_bytes = await client.create(
kind=TESTING_FILE_CONTRACT,
contract_ref="CONTRACT-CREATE-BYTES-001",
description="Test contract with bytes upload",
)
contract_bytes.upload_from_bytes(content=PDF_MAGIC_BYTES, name="contract.pdf")
await contract_bytes.save()

fetched = await client.get(kind=TESTING_FILE_CONTRACT, id=contract_bytes.id)
assert fetched.contract_ref.value == "CONTRACT-CREATE-BYTES-001"
assert fetched.file_name.value == "contract.pdf"
assert fetched.file_size.value == len(PDF_MAGIC_BYTES)
assert fetched.checksum.value == hashlib.sha1(PDF_MAGIC_BYTES, usedforsecurity=False).hexdigest()
assert fetched.storage_id.value

with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir) / "upload_test.txt"
tmp_path.write_bytes(TEXT_CONTENT)

contract_path = await client.create(
kind=TESTING_FILE_CONTRACT,
contract_ref="CONTRACT-CREATE-PATH-001",
description="Test contract from path",
)
contract_path.upload_from_path(path=tmp_path)
await contract_path.save()

fetched = await client.get(kind=TESTING_FILE_CONTRACT, id=contract_path.id)
assert fetched.file_name.value == tmp_path.name
assert fetched.file_size.value == len(TEXT_CONTENT)
assert fetched.checksum.value == hashlib.sha1(TEXT_CONTENT, usedforsecurity=False).hexdigest()
assert fetched.storage_id.value

async def test_update_file_object_with_new_file(
self, client: InfrahubClient, load_file_object_schema: None
) -> None:
"""Test updating a FileObject node with a new file."""
contract = await client.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPDATE-001", description="Initial contract"
)
contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="initial.pdf")
await contract.save()

contract_to_update = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
contract_to_update.description.value = "Updated contract"
contract_to_update.upload_from_bytes(content=PNG_MAGIC_BYTES, name="updated.png")
await contract_to_update.save()

updated = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
assert updated.description.value == "Updated contract"
assert updated.file_name.value == "updated.png"
assert updated.storage_id.value != contract.storage_id.value
assert updated.checksum.value != contract.checksum.value

async def test_upsert_file_object_update(self, client: InfrahubClient, load_file_object_schema: None) -> None:
"""Test upserting an existing FileObject node updates it rather than creating a duplicate."""
contract = await client.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-001", description="Original"
)
contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="original.pdf")
await contract.save()

contract_upsert = await client.create(
kind=TESTING_FILE_CONTRACT,
contract_ref="CONTRACT-UPSERT-001",
description="Upserted update",
)
contract_upsert.upload_from_bytes(content=PNG_MAGIC_BYTES, name="upserted.png")
await contract_upsert.save(allow_upsert=True)
assert contract_upsert.id == contract.id

updated = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
assert updated.description.value == "Upserted update"
assert updated.file_name.value == "upserted.png"
assert updated.storage_id.value != contract.storage_id.value

async def test_download_file(self, client: InfrahubClient, load_file_object_schema: None) -> None:
"""Test downloading files to memory and to disk."""
contract = await client.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-DOWNLOAD-001", description="Download test"
)
contract.upload_from_bytes(content=TEXT_CONTENT, name="download_test.txt")
await contract.save()

fetched = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
downloaded_content = await fetched.download_file()
assert downloaded_content == TEXT_CONTENT

with tempfile.TemporaryDirectory() as tmpdir:
dest_path = Path(tmpdir) / "downloaded.txt"
bytes_written = await fetched.download_file(dest=dest_path)
assert bytes_written == len(TEXT_CONTENT)
assert dest_path.read_bytes() == TEXT_CONTENT

async def test_update_without_file_change(self, client: InfrahubClient, load_file_object_schema: None) -> None:
"""Test updating FileObject attributes without replacing the file."""
contract = await client.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-META-001", description="Original description"
)
contract.upload_from_bytes(content=TEXT_CONTENT, name="unchanged.txt")
await contract.save()

contract_to_update = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
contract_to_update.description.value = "Updated description"
await contract_to_update.save()

updated = await client.get(kind=TESTING_FILE_CONTRACT, id=contract.id)
assert updated.description.value == "Updated description"
assert updated.storage_id.value == contract_to_update.storage_id.value
assert updated.checksum.value == contract_to_update.checksum.value


@pytest.mark.xfail(reason="Requires Infrahub 1.8+")
class TestFileObjectSync(TestInfrahubDockerClient, SchemaFileObject):
"""Sync integration tests for FileObject functionality."""

def test_create_file_object_with_upload_sync(
self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None
) -> None:
"""Test creating FileObject nodes with both upload_from_bytes and upload_from_path (sync)."""
contract_bytes = client_sync.create(
kind=TESTING_FILE_CONTRACT,
contract_ref="CONTRACT-CREATE-BYTES-SYNC-001",
description="Test contract with bytes upload (sync)",
)
contract_bytes.upload_from_bytes(content=PDF_MAGIC_BYTES, name="contract_sync.pdf")
contract_bytes.save()

fetched = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract_bytes.id)
assert fetched.contract_ref.value == "CONTRACT-CREATE-BYTES-SYNC-001"
assert fetched.file_name.value == "contract_sync.pdf"
assert fetched.file_size.value == len(PDF_MAGIC_BYTES)
assert fetched.checksum.value == hashlib.sha1(PDF_MAGIC_BYTES, usedforsecurity=False).hexdigest()
assert fetched.storage_id.value

with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir) / "upload_test_sync.txt"
tmp_path.write_bytes(TEXT_CONTENT)

contract_path = client_sync.create(
kind=TESTING_FILE_CONTRACT,
contract_ref="CONTRACT-CREATE-PATH-SYNC-001",
description="Test contract from path (sync)",
)
contract_path.upload_from_path(path=tmp_path)
contract_path.save()

fetched = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract_path.id)
assert fetched.file_name.value == tmp_path.name
assert fetched.file_size.value == len(TEXT_CONTENT)
assert fetched.checksum.value == hashlib.sha1(TEXT_CONTENT, usedforsecurity=False).hexdigest()
assert fetched.storage_id.value

def test_update_file_object_with_new_file_sync(
self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None
) -> None:
"""Test updating a FileObject node with a new file (sync)."""
contract = client_sync.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPDATE-SYNC-001", description="Initial contract sync"
)
contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="initial_sync.pdf")
contract.save()

contract_to_update = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
contract_to_update.description.value = "Updated contract sync"
contract_to_update.upload_from_bytes(content=PNG_MAGIC_BYTES, name="updated_sync.png")
contract_to_update.save()

updated = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
assert updated.description.value == "Updated contract sync"
assert updated.file_name.value == "updated_sync.png"
assert updated.storage_id.value != contract.storage_id.value
assert updated.checksum.value != contract.checksum.value

def test_upsert_file_object_update_sync(
self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None
) -> None:
"""Test upserting an existing FileObject node updates it rather than creating a duplicate (sync)."""
contract = client_sync.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-SYNC-001", description="Original sync"
)
contract.upload_from_bytes(content=PDF_MAGIC_BYTES, name="original_sync.pdf")
contract.save()

contract_upsert = client_sync.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-UPSERT-SYNC-001", description="Upserted update sync"
)
contract_upsert.upload_from_bytes(content=PNG_MAGIC_BYTES, name="upserted_sync.png")
contract_upsert.save(allow_upsert=True)
assert contract_upsert.id == contract.id

updated = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
assert updated.description.value == "Upserted update sync"
assert updated.file_name.value == "upserted_sync.png"
assert updated.storage_id.value != contract.storage_id.value

def test_download_file_sync(self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None) -> None:
"""Test downloading files to memory and to disk (sync)."""
contract = client_sync.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-DOWNLOAD-SYNC-001", description="Download test sync"
)
contract.upload_from_bytes(content=TEXT_CONTENT, name="download_sync.txt")
contract.save()

fetched = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
downloaded_content = fetched.download_file()
assert downloaded_content == TEXT_CONTENT

with tempfile.TemporaryDirectory() as tmpdir:
dest_path = Path(tmpdir) / "downloaded_sync.txt"
bytes_written = fetched.download_file(dest=dest_path)
assert bytes_written == len(TEXT_CONTENT)
assert dest_path.read_bytes() == TEXT_CONTENT

def test_update_without_file_change_sync(
self, client_sync: InfrahubClientSync, load_file_object_schema_sync: None
) -> None:
"""Test updating FileObject attributes without replacing the file (sync)."""
contract = client_sync.create(
kind=TESTING_FILE_CONTRACT, contract_ref="CONTRACT-META-SYNC-001", description="Original description sync"
)
contract.upload_from_bytes(content=TEXT_CONTENT, name="unchanged_sync.txt")
contract.save()

contract_to_update = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id, populate_store=False)
contract_to_update.description.value = "Updated description sync"
contract_to_update.save()

updated = client_sync.get(kind=TESTING_FILE_CONTRACT, id=contract.id)
assert updated.description.value == "Updated description sync"
assert updated.storage_id.value == contract_to_update.storage_id.value
assert updated.checksum.value == contract_to_update.checksum.value