-
Notifications
You must be signed in to change notification settings - Fork 0
core: add core_genesis SQLite table and repository #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
16 changes: 16 additions & 0 deletions
16
src/modulr_core/persistence/migrations/007_core_genesis.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| -- Singleton genesis / bootstrap trust record for this Core instance. | ||
| -- Stores whether first-boot wizard finished and the proven Ed25519 signing pubkey | ||
| -- used to anchor apex policy (e.g. modulr.* namespace ownership). Enforcement of | ||
| -- subdomain rules uses this row in later stages; this migration only persists state. | ||
| PRAGMA foreign_keys = ON; | ||
|
|
||
| CREATE TABLE IF NOT EXISTS core_genesis ( | ||
| singleton INTEGER PRIMARY KEY CHECK (singleton = 1), | ||
| genesis_complete INTEGER NOT NULL DEFAULT 0 CHECK (genesis_complete IN (0, 1)), | ||
| bootstrap_signing_pubkey_hex TEXT, | ||
| modulr_apex_domain TEXT, | ||
| updated_at INTEGER NOT NULL | ||
| ); | ||
|
|
||
| INSERT OR IGNORE INTO core_genesis (singleton, genesis_complete, updated_at) | ||
| VALUES (1, 0, 0); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| """Singleton ``core_genesis`` row: wizard completion + bootstrap signing pubkey.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import sqlite3 | ||
| from dataclasses import dataclass | ||
|
|
||
| from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey | ||
|
|
||
| from modulr_core.validation.hex_codec import InvalidHexEncoding, decode_hex_fixed | ||
|
|
||
| _MODULR_APEX_DOMAIN_MAX_LEN = 253 | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class CoreGenesisSnapshot: | ||
| """Read model for :class:`CoreGenesisRepository`.""" | ||
|
|
||
| genesis_complete: bool | ||
| bootstrap_signing_pubkey_hex: str | None | ||
| modulr_apex_domain: str | None | ||
| updated_at: int | ||
|
|
||
|
|
||
| def _validate_bootstrap_pubkey_hex(pub_hex: str) -> None: | ||
| try: | ||
| raw = decode_hex_fixed(pub_hex, byte_length=32) | ||
| except InvalidHexEncoding as e: | ||
| raise ValueError(str(e)) from e | ||
| try: | ||
| Ed25519PublicKey.from_public_bytes(raw) | ||
| except ValueError as e: | ||
| raise ValueError(f"invalid Ed25519 public key: {e}") from e | ||
|
|
||
|
|
||
| def _validate_apex_domain(domain: str) -> str: | ||
| d = domain.strip() | ||
| if not d: | ||
| raise ValueError("modulr_apex_domain must be non-empty when set") | ||
| if len(d) > _MODULR_APEX_DOMAIN_MAX_LEN: | ||
| mx = _MODULR_APEX_DOMAIN_MAX_LEN | ||
| raise ValueError(f"modulr_apex_domain must be at most {mx} characters") | ||
| return d | ||
|
|
||
|
|
||
| class CoreGenesisRepository: | ||
| """Read/update the single ``core_genesis`` row (migration ``007`` seeds it).""" | ||
|
|
||
| def __init__(self, conn: sqlite3.Connection) -> None: | ||
| self._conn = conn | ||
|
|
||
| def get(self) -> CoreGenesisSnapshot: | ||
| cur = self._conn.execute( | ||
| """ | ||
| SELECT genesis_complete, bootstrap_signing_pubkey_hex, | ||
| modulr_apex_domain, updated_at | ||
| FROM core_genesis | ||
| WHERE singleton = 1 | ||
| """, | ||
| ) | ||
| row = cur.fetchone() | ||
| if row is None: | ||
| raise RuntimeError("core_genesis singleton missing; run apply_migrations") | ||
| complete = int(row["genesis_complete"]) == 1 | ||
| pk = row["bootstrap_signing_pubkey_hex"] | ||
| pk_s = str(pk) if pk is not None else None | ||
| apex = row["modulr_apex_domain"] | ||
| apex_s = str(apex).strip() if apex is not None and str(apex).strip() else None | ||
| return CoreGenesisSnapshot( | ||
| genesis_complete=complete, | ||
| bootstrap_signing_pubkey_hex=pk_s, | ||
| modulr_apex_domain=apex_s, | ||
| updated_at=int(row["updated_at"]), | ||
| ) | ||
|
|
||
| def set_genesis_complete(self, *, complete: bool, updated_at: int) -> None: | ||
| self._conn.execute( | ||
| """ | ||
| UPDATE core_genesis | ||
| SET genesis_complete = ?, updated_at = ? | ||
| WHERE singleton = 1 | ||
| """, | ||
| (1 if complete else 0, updated_at), | ||
| ) | ||
|
|
||
| def set_bootstrap_signing_pubkey_hex( | ||
| self, | ||
| *, | ||
| pubkey_hex: str | None, | ||
| updated_at: int, | ||
| ) -> None: | ||
| if pubkey_hex is not None: | ||
| _validate_bootstrap_pubkey_hex(pubkey_hex) | ||
| self._conn.execute( | ||
| """ | ||
| UPDATE core_genesis | ||
| SET bootstrap_signing_pubkey_hex = ?, updated_at = ? | ||
| WHERE singleton = 1 | ||
| """, | ||
| (pubkey_hex, updated_at), | ||
| ) | ||
|
|
||
| def set_modulr_apex_domain( | ||
| self, | ||
| *, | ||
| apex_domain: str | None, | ||
| updated_at: int, | ||
| ) -> None: | ||
| if apex_domain is not None: | ||
| apex_domain = _validate_apex_domain(apex_domain) | ||
| self._conn.execute( | ||
| """ | ||
| UPDATE core_genesis | ||
| SET modulr_apex_domain = ?, updated_at = ? | ||
| WHERE singleton = 1 | ||
| """, | ||
| (apex_domain, updated_at), | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| """Singleton ``core_genesis`` persistence (migration 007).""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import sqlite3 | ||
|
|
||
| import pytest | ||
| from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey | ||
| from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat | ||
|
|
||
| from modulr_core.persistence import apply_migrations, connect_memory | ||
| from modulr_core.repositories.core_genesis import CoreGenesisRepository | ||
|
|
||
|
|
||
| def _valid_pubkey_hex() -> str: | ||
| pk = Ed25519PrivateKey.generate().public_key() | ||
| return pk.public_bytes(encoding=Encoding.Raw, format=PublicFormat.Raw).hex() | ||
|
|
||
|
|
||
| def _conn() -> sqlite3.Connection: | ||
| c = connect_memory(check_same_thread=False) | ||
| apply_migrations(c) | ||
| return c | ||
|
|
||
|
|
||
| def test_core_genesis_default_after_migration() -> None: | ||
| conn = _conn() | ||
| repo = CoreGenesisRepository(conn) | ||
| s = repo.get() | ||
| assert s.genesis_complete is False | ||
| assert s.bootstrap_signing_pubkey_hex is None | ||
| assert s.modulr_apex_domain is None | ||
| assert s.updated_at == 0 | ||
|
|
||
|
|
||
| def test_core_genesis_set_pubkey_and_complete() -> None: | ||
| conn = _conn() | ||
| repo = CoreGenesisRepository(conn) | ||
| k = _valid_pubkey_hex() | ||
| repo.set_bootstrap_signing_pubkey_hex(pubkey_hex=k, updated_at=100) | ||
| repo.set_modulr_apex_domain(apex_domain="modulr.network", updated_at=101) | ||
| repo.set_genesis_complete(complete=True, updated_at=102) | ||
| conn.commit() | ||
|
|
||
| s = repo.get() | ||
| assert s.genesis_complete is True | ||
| assert s.bootstrap_signing_pubkey_hex == k | ||
| assert s.modulr_apex_domain == "modulr.network" | ||
| assert s.updated_at == 102 | ||
|
|
||
|
|
||
| def test_core_genesis_rejects_invalid_pubkey_hex() -> None: | ||
| conn = _conn() | ||
| repo = CoreGenesisRepository(conn) | ||
| with pytest.raises(ValueError, match="expected 64 hex"): | ||
| repo.set_bootstrap_signing_pubkey_hex(pubkey_hex="not-hex", updated_at=1) | ||
|
|
||
|
|
||
| def test_core_genesis_rejects_uppercase_pubkey_hex() -> None: | ||
| conn = _conn() | ||
| repo = CoreGenesisRepository(conn) | ||
| bad = _valid_pubkey_hex().upper() | ||
| with pytest.raises(ValueError, match="lowercase"): | ||
| repo.set_bootstrap_signing_pubkey_hex(pubkey_hex=bad, updated_at=1) | ||
|
|
||
|
|
||
| def test_core_genesis_clear_pubkey() -> None: | ||
| conn = _conn() | ||
| repo = CoreGenesisRepository(conn) | ||
| k = _valid_pubkey_hex() | ||
| repo.set_bootstrap_signing_pubkey_hex(pubkey_hex=k, updated_at=50) | ||
| repo.set_bootstrap_signing_pubkey_hex(pubkey_hex=None, updated_at=51) | ||
| conn.commit() | ||
| assert repo.get().bootstrap_signing_pubkey_hex is None | ||
|
|
||
|
|
||
| def test_core_genesis_apex_domain_validation() -> None: | ||
| conn = _conn() | ||
| repo = CoreGenesisRepository(conn) | ||
| with pytest.raises(ValueError, match="non-empty"): | ||
| repo.set_modulr_apex_domain(apex_domain=" ", updated_at=1) | ||
| with pytest.raises(ValueError, match="at most"): | ||
| repo.set_modulr_apex_domain(apex_domain="x" * 254, updated_at=1) | ||
|
|
||
|
|
||
| def test_schema_migrations_includes_007() -> None: | ||
| conn = _conn() | ||
| cur = conn.execute( | ||
| "SELECT 1 FROM schema_migrations WHERE version = 7", | ||
| ) | ||
| assert cur.fetchone() is not None |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set_modulr_apex_domaincurrently accepts any non-empty trimmed string up to 253 chars, so values like"not a domain"or single-label names can be persisted asmodulr_apex_domain. Because this field is described as the apex trust domain, storing non-domain values creates invalid persistent state that later policy checks can mis-handle; validate against the existing org-domain rules (dotted DNS-style) before writing.Useful? React with 👍 / 👎.