Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions robosystems_client/clients/ledger_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@
from ..graphql.queries.ledger import (
GET_ACCOUNT_ROLLUPS_QUERY,
GET_ACCOUNT_TREE_QUERY,
GET_AGENT_QUERY,
GET_CLOSING_BOOK_STRUCTURES_QUERY,
GET_ENTITY_QUERY,
GET_EVENT_BLOCK_QUERY,
GET_FISCAL_CALENDAR_QUERY,
GET_MAPPED_TRIAL_BALANCE_QUERY,
GET_MAPPING_COVERAGE_QUERY,
Expand All @@ -162,8 +164,10 @@
GET_TRANSACTION_QUERY,
GET_TRIAL_BALANCE_QUERY,
LIST_ACCOUNTS_QUERY,
LIST_AGENTS_QUERY,
LIST_ELEMENTS_QUERY,
LIST_ENTITIES_QUERY,
LIST_EVENT_BLOCKS_QUERY,
LIST_INFORMATION_BLOCKS_QUERY,
LIST_MAPPINGS_QUERY,
LIST_STRUCTURES_QUERY,
Expand All @@ -173,10 +177,14 @@
parse_account_rollups,
parse_account_tree,
parse_accounts,
parse_agent,
parse_agents,
parse_closing_book_structures,
parse_elements,
parse_entities,
parse_entity,
parse_event_block,
parse_event_blocks,
parse_fiscal_calendar,
parse_information_block,
parse_information_blocks,
Expand Down Expand Up @@ -510,6 +518,70 @@ def get_transaction(
)
return parse_transaction(data)

# ── Event blocks (inbox surface) ───────────────────────────────────

def list_event_blocks(
self,
graph_id: str,
event_type: str | None = None,
event_category: str | None = None,
status: str | None = None,
agent_id: str | None = None,
source: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[dict[str, Any]]:
"""List captured event blocks (inbox surface)."""
data = self._query(
graph_id,
LIST_EVENT_BLOCKS_QUERY,
{
"eventType": event_type,
"eventCategory": event_category,
"status": status,
"agentId": agent_id,
"source": source,
"limit": limit,
"offset": offset,
},
)
return parse_event_blocks(data)

def get_event_block(self, graph_id: str, event_id: str) -> dict[str, Any] | None:
"""Get event block detail by id."""
data = self._query(graph_id, GET_EVENT_BLOCK_QUERY, {"id": event_id})
return parse_event_block(data)

# ── Agents (REA counterparties) ────────────────────────────────────

def list_agents(
self,
graph_id: str,
agent_type: str | None = None,
source: str | None = None,
is_active: bool | None = True,
limit: int = 50,
offset: int = 0,
) -> list[dict[str, Any]]:
"""List agents (customers, vendors, employees)."""
data = self._query(
graph_id,
LIST_AGENTS_QUERY,
{
"agentType": agent_type,
"source": source,
"isActive": is_active,
"limit": limit,
"offset": offset,
},
)
return parse_agents(data)

def get_agent(self, graph_id: str, agent_id: str) -> dict[str, Any] | None:
"""Get agent detail by id."""
data = self._query(graph_id, GET_AGENT_QUERY, {"id": agent_id})
return parse_agent(data)

# ── Trial balance ──────────────────────────────────────────────────

def get_trial_balance(
Expand Down
100 changes: 100 additions & 0 deletions robosystems_client/graphql/queries/ledger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,106 @@ def parse_transaction(data: dict[str, Any]) -> dict[str, Any] | None:
return keys_to_snake(tx) if tx is not None else None


# ── Event blocks (inbox) ───────────────────────────────────────────────

LIST_EVENT_BLOCKS_QUERY = """
query ListLedgerEventBlocks(
$eventType: String
$eventCategory: String
$status: String
$agentId: String
$source: String
$limit: Int! = 50
$offset: Int! = 0
) {
eventBlocks(
eventType: $eventType
eventCategory: $eventCategory
status: $status
agentId: $agentId
source: $source
limit: $limit
offset: $offset
) {
id eventType eventCategory eventClass status occurredAt effectiveAt
source externalId externalUrl amount currency description metadata
dimensionIds agentId resourceType resourceElementId
replacedByEventId replacesEventId obligatedByEventId dischargesEventId
createdAt createdBy
}
}
""".strip()


def parse_event_blocks(data: dict[str, Any]) -> list[dict[str, Any]]:
events = data.get("eventBlocks") or []
return [keys_to_snake(e) for e in events]


GET_EVENT_BLOCK_QUERY = """
query GetLedgerEventBlock($id: String!) {
eventBlock(id: $id) {
id eventType eventCategory eventClass status occurredAt effectiveAt
source externalId externalUrl amount currency description metadata
dimensionIds agentId resourceType resourceElementId
replacedByEventId replacesEventId obligatedByEventId dischargesEventId
createdAt createdBy
}
}
""".strip()


def parse_event_block(data: dict[str, Any]) -> dict[str, Any] | None:
e = data.get("eventBlock")
return keys_to_snake(e) if e is not None else None


# ── Agents (REA counterparties) ────────────────────────────────────────

LIST_AGENTS_QUERY = """
query ListLedgerAgents(
$agentType: String
$source: String
$isActive: Boolean = true
$limit: Int! = 50
$offset: Int! = 0
) {
agents(
agentType: $agentType
source: $source
isActive: $isActive
limit: $limit
offset: $offset
) {
id agentType name legalName taxId registrationNumber duns lei
email phone address source externalId
isActive is1099Recipient createdAt updatedAt createdBy
}
}
""".strip()


def parse_agents(data: dict[str, Any]) -> list[dict[str, Any]]:
agents = data.get("agents") or []
return [keys_to_snake(a) for a in agents]


GET_AGENT_QUERY = """
query GetLedgerAgent($id: String!) {
agent(id: $id) {
id agentType name legalName taxId registrationNumber duns lei
email phone address source externalId
isActive is1099Recipient createdAt updatedAt createdBy
}
}
""".strip()


def parse_agent(data: dict[str, Any]) -> dict[str, Any] | None:
a = data.get("agent")
return keys_to_snake(a) if a is not None else None


# ── Taxonomy ───────────────────────────────────────────────────────────

GET_REPORTING_TAXONOMY_QUERY = """
Expand Down
153 changes: 153 additions & 0 deletions tests/test_ledger_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,159 @@ def test_get_transaction(self, mock_execute, mock_config, graph_id):
variables = mock_execute.call_args[0][2]
assert variables["transactionId"] == "tx_1"

@patch("robosystems_client.graphql.client.GraphQLClient.execute")
def test_list_event_blocks(self, mock_execute, mock_config, graph_id):
mock_execute.return_value = {
"eventBlocks": [
{
"id": "evt_1",
"eventType": "invoice_issued",
"eventCategory": "sales",
"eventClass": "economic",
"status": "captured",
"occurredAt": "2026-03-15T00:00:00Z",
"effectiveAt": None,
"source": "quickbooks",
"externalId": "Invoice_9",
"externalUrl": None,
"amount": 10800,
"currency": "USD",
"description": None,
"metadata": {"qb_txn_type": "Invoice"},
"dimensionIds": [],
"agentId": "agt_1",
"resourceType": None,
"resourceElementId": None,
"replacedByEventId": None,
"replacesEventId": None,
"obligatedByEventId": None,
"dischargesEventId": None,
"createdAt": "2026-03-15T12:00:00Z",
"createdBy": "user_1",
}
]
}
client = LedgerClient(mock_config)
result = client.list_event_blocks(
graph_id, event_type="invoice_issued", status="captured"
)
assert len(result) == 1
assert result[0]["id"] == "evt_1"
assert result[0]["agent_id"] == "agt_1"
variables = mock_execute.call_args[0][2]
assert variables["eventType"] == "invoice_issued"
assert variables["status"] == "captured"

@patch("robosystems_client.graphql.client.GraphQLClient.execute")
def test_get_event_block(self, mock_execute, mock_config, graph_id):
mock_execute.return_value = {
"eventBlock": {
"id": "evt_1",
"eventType": "invoice_issued",
"eventCategory": "sales",
"eventClass": "economic",
"status": "captured",
"occurredAt": "2026-03-15T00:00:00Z",
"effectiveAt": None,
"source": "quickbooks",
"externalId": "Invoice_9",
"externalUrl": None,
"amount": 10800,
"currency": "USD",
"description": None,
"metadata": {},
"dimensionIds": [],
"agentId": "agt_1",
"resourceType": None,
"resourceElementId": None,
"replacedByEventId": None,
"replacesEventId": None,
"obligatedByEventId": None,
"dischargesEventId": None,
"createdAt": "2026-03-15T12:00:00Z",
"createdBy": "user_1",
}
}
client = LedgerClient(mock_config)
result = client.get_event_block(graph_id, "evt_1")
assert result is not None
assert result["id"] == "evt_1"
variables = mock_execute.call_args[0][2]
assert variables["id"] == "evt_1"

@patch("robosystems_client.graphql.client.GraphQLClient.execute")
def test_get_event_block_returns_none_when_missing(
self, mock_execute, mock_config, graph_id
):
mock_execute.return_value = {"eventBlock": None}
client = LedgerClient(mock_config)
assert client.get_event_block(graph_id, "evt_missing") is None

@patch("robosystems_client.graphql.client.GraphQLClient.execute")
def test_list_agents(self, mock_execute, mock_config, graph_id):
mock_execute.return_value = {
"agents": [
{
"id": "agt_1",
"agentType": "customer",
"name": "Amy's Bird Sanctuary",
"legalName": None,
"taxId": None,
"registrationNumber": None,
"duns": None,
"lei": None,
"email": "birds@intuit.com",
"phone": "(650) 555-3311",
"address": None,
"source": "quickbooks",
"externalId": "1",
"isActive": True,
"is1099Recipient": False,
"createdAt": "2026-03-15T12:00:00Z",
"updatedAt": "2026-03-15T12:00:00Z",
"createdBy": "user_1",
}
]
}
client = LedgerClient(mock_config)
result = client.list_agents(graph_id, agent_type="customer")
assert len(result) == 1
assert result[0]["agent_type"] == "customer"
variables = mock_execute.call_args[0][2]
assert variables["agentType"] == "customer"
assert variables["isActive"] is True

@patch("robosystems_client.graphql.client.GraphQLClient.execute")
def test_get_agent(self, mock_execute, mock_config, graph_id):
mock_execute.return_value = {
"agent": {
"id": "agt_1",
"agentType": "customer",
"name": "Amy's Bird Sanctuary",
"legalName": None,
"taxId": None,
"registrationNumber": None,
"duns": None,
"lei": None,
"email": None,
"phone": None,
"address": None,
"source": "quickbooks",
"externalId": "1",
"isActive": True,
"is1099Recipient": False,
"createdAt": "2026-03-15T12:00:00Z",
"updatedAt": "2026-03-15T12:00:00Z",
"createdBy": "user_1",
}
}
client = LedgerClient(mock_config)
result = client.get_agent(graph_id, "agt_1")
assert result is not None
assert result["id"] == "agt_1"
variables = mock_execute.call_args[0][2]
assert variables["id"] == "agt_1"

@patch("robosystems_client.graphql.client.GraphQLClient.execute")
def test_get_mapped_trial_balance(self, mock_execute, mock_config, graph_id):
mock_execute.return_value = {
Expand Down
Loading