Skip to content

Commit b11f378

Browse files
authored
Merge pull request #292 from martynia/janusz_pilotDB
feat: Implement PilotAgents schema
2 parents 81b47d3 + 09475e7 commit b11f378

10 files changed

Lines changed: 176 additions & 29 deletions

File tree

diracx-db/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ testing = [
3131
AuthDB = "diracx.db.sql:AuthDB"
3232
JobDB = "diracx.db.sql:JobDB"
3333
JobLoggingDB = "diracx.db.sql:JobLoggingDB"
34+
PilotAgentsDB = "diracx.db.sql:PilotAgentsDB"
3435
SandboxMetadataDB = "diracx.db.sql:SandboxMetadataDB"
3536
TaskQueueDB = "diracx.db.sql:TaskQueueDB"
3637

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
from __future__ import annotations
22

3-
__all__ = ("AuthDB", "JobDB", "JobLoggingDB", "SandboxMetadataDB", "TaskQueueDB")
3+
__all__ = (
4+
"AuthDB",
5+
"JobDB",
6+
"JobLoggingDB",
7+
"PilotAgentsDB",
8+
"SandboxMetadataDB",
9+
"TaskQueueDB",
10+
)
411

512
from .auth.db import AuthDB
613
from .job.db import JobDB
714
from .job_logging.db import JobLoggingDB
15+
from .pilot_agents.db import PilotAgentsDB
816
from .sandbox_metadata.db import SandboxMetadataDB
917
from .task_queue.db import TaskQueueDB

diracx-db/src/diracx/db/sql/job/schema.py

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import sqlalchemy.types as types
21
from sqlalchemy import (
32
DateTime,
43
Enum,
@@ -10,37 +9,11 @@
109
)
1110
from sqlalchemy.orm import declarative_base
1211

13-
from ..utils import Column, NullColumn
12+
from ..utils import Column, EnumBackedBool, NullColumn
1413

1514
JobDBBase = declarative_base()
1615

1716

18-
class EnumBackedBool(types.TypeDecorator):
19-
"""Maps a ``EnumBackedBool()`` column to True/False in Python."""
20-
21-
impl = types.Enum
22-
cache_ok: bool = True
23-
24-
def __init__(self) -> None:
25-
super().__init__("True", "False")
26-
27-
def process_bind_param(self, value, dialect) -> str:
28-
if value is True:
29-
return "True"
30-
elif value is False:
31-
return "False"
32-
else:
33-
raise NotImplementedError(value, dialect)
34-
35-
def process_result_value(self, value, dialect) -> bool:
36-
if value == "True":
37-
return True
38-
elif value == "False":
39-
return False
40-
else:
41-
raise NotImplementedError(f"Unknown {value=}")
42-
43-
4417
class Jobs(JobDBBase):
4518
__tablename__ = "Jobs"
4619

diracx-db/src/diracx/db/sql/pilot_agents/__init__.py

Whitespace-only changes.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from __future__ import annotations
2+
3+
from datetime import datetime, timezone
4+
5+
from sqlalchemy import insert
6+
7+
from ..utils import BaseSQLDB
8+
from .schema import PilotAgents, PilotAgentsDBBase
9+
10+
11+
class PilotAgentsDB(BaseSQLDB):
12+
"""PilotAgentsDB class is a front-end to the PilotAgents Database."""
13+
14+
metadata = PilotAgentsDBBase.metadata
15+
16+
async def add_pilot_references(
17+
self,
18+
pilot_ref: list[str],
19+
vo: str,
20+
grid_type: str = "DIRAC",
21+
pilot_stamps: dict | None = None,
22+
) -> None:
23+
24+
if pilot_stamps is None:
25+
pilot_stamps = {}
26+
27+
now = datetime.now(tz=timezone.utc)
28+
29+
# Prepare the list of dictionaries for bulk insertion
30+
values = [
31+
{
32+
"PilotJobReference": ref,
33+
"VO": vo,
34+
"GridType": grid_type,
35+
"SubmissionTime": now,
36+
"LastUpdateTime": now,
37+
"Status": "Submitted",
38+
"PilotStamp": pilot_stamps.get(ref, ""),
39+
}
40+
for ref in pilot_ref
41+
]
42+
43+
# Insert multiple rows in a single execute call
44+
stmt = insert(PilotAgents).values(values)
45+
await self.conn.execute(stmt)
46+
return
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from sqlalchemy import (
2+
DateTime,
3+
Double,
4+
Index,
5+
Integer,
6+
String,
7+
Text,
8+
)
9+
from sqlalchemy.orm import declarative_base
10+
11+
from ..utils import Column, EnumBackedBool, NullColumn
12+
13+
PilotAgentsDBBase = declarative_base()
14+
15+
16+
class PilotAgents(PilotAgentsDBBase):
17+
__tablename__ = "PilotAgents"
18+
19+
PilotID = Column("PilotID", Integer, autoincrement=True, primary_key=True)
20+
InitialJobID = Column("InitialJobID", Integer, default=0)
21+
CurrentJobID = Column("CurrentJobID", Integer, default=0)
22+
PilotJobReference = Column("PilotJobReference", String(255), default="Unknown")
23+
PilotStamp = Column("PilotStamp", String(32), default="")
24+
DestinationSite = Column("DestinationSite", String(128), default="NotAssigned")
25+
Queue = Column("Queue", String(128), default="Unknown")
26+
GridSite = Column("GridSite", String(128), default="Unknown")
27+
VO = Column("VO", String(128))
28+
GridType = Column("GridType", String(32), default="LCG")
29+
BenchMark = Column("BenchMark", Double, default=0.0)
30+
SubmissionTime = NullColumn("SubmissionTime", DateTime)
31+
LastUpdateTime = NullColumn("LastUpdateTime", DateTime)
32+
Status = Column("Status", String(32), default="Unknown")
33+
StatusReason = Column("StatusReason", String(255), default="Unknown")
34+
AccountingSent = Column("AccountingSent", EnumBackedBool(), default=False)
35+
36+
__table_args__ = (
37+
Index("PilotJobReference", "PilotJobReference"),
38+
Index("Status", "Status"),
39+
Index("Statuskey", "GridSite", "DestinationSite", "Status"),
40+
)
41+
42+
43+
class JobToPilotMapping(PilotAgentsDBBase):
44+
__tablename__ = "JobToPilotMapping"
45+
46+
PilotID = Column("PilotID", Integer, primary_key=True)
47+
JobID = Column("JobID", Integer, primary_key=True)
48+
StartTime = Column("StartTime", DateTime)
49+
50+
__table_args__ = (Index("JobID", "JobID"), Index("PilotID", "PilotID"))
51+
52+
53+
class PilotOutput(PilotAgentsDBBase):
54+
__tablename__ = "PilotOutput"
55+
56+
PilotID = Column("PilotID", Integer, primary_key=True)
57+
StdOutput = Column("StdOutput", Text)
58+
StdError = Column("StdError", Text)

diracx-db/src/diracx/db/sql/utils/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from functools import partial
1414
from typing import TYPE_CHECKING, Self, cast
1515

16+
import sqlalchemy.types as types
1617
from pydantic import TypeAdapter
1718
from sqlalchemy import Column as RawColumn
1819
from sqlalchemy import DateTime, Enum, MetaData, select
@@ -128,6 +129,32 @@ def EnumColumn(enum_type, **kwargs):
128129
return Column(Enum(enum_type, native_enum=False, length=16), **kwargs)
129130

130131

132+
class EnumBackedBool(types.TypeDecorator):
133+
"""Maps a ``EnumBackedBool()`` column to True/False in Python."""
134+
135+
impl = types.Enum
136+
cache_ok: bool = True
137+
138+
def __init__(self) -> None:
139+
super().__init__("True", "False")
140+
141+
def process_bind_param(self, value, dialect) -> str:
142+
if value is True:
143+
return "True"
144+
elif value is False:
145+
return "False"
146+
else:
147+
raise NotImplementedError(value, dialect)
148+
149+
def process_result_value(self, value, dialect) -> bool:
150+
if value == "True":
151+
return True
152+
elif value == "False":
153+
return False
154+
else:
155+
raise NotImplementedError(f"Unknown {value=}")
156+
157+
131158
class SQLDBError(Exception):
132159
pass
133160

diracx-db/tests/pilot_agents/__init__.py

Whitespace-only changes.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from __future__ import annotations
2+
3+
import pytest
4+
5+
from diracx.db.sql.pilot_agents.db import PilotAgentsDB
6+
7+
8+
@pytest.fixture
9+
async def pilot_agents_db(tmp_path) -> PilotAgentsDB:
10+
agents_db = PilotAgentsDB("sqlite+aiosqlite:///:memory:")
11+
async with agents_db.engine_context():
12+
async with agents_db.engine.begin() as conn:
13+
await conn.run_sync(agents_db.metadata.create_all)
14+
yield agents_db
15+
16+
17+
async def test_insert_and_select(pilot_agents_db: PilotAgentsDB):
18+
19+
async with pilot_agents_db as pilot_agents_db:
20+
# Add a pilot reference
21+
refs = [f"ref_{i}" for i in range(10)]
22+
stamps = [f"stamp_{i}" for i in range(10)]
23+
stamp_dict = dict(zip(refs, stamps))
24+
25+
await pilot_agents_db.add_pilot_references(
26+
refs, "test_vo", grid_type="DIRAC", pilot_stamps=stamp_dict
27+
)
28+
29+
await pilot_agents_db.add_pilot_references(
30+
refs, "test_vo", grid_type="DIRAC", pilot_stamps=None
31+
)

diracx-routers/src/diracx/routers/dependencies.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"JobLoggingDB",
88
"SandboxMetadataDB",
99
"TaskQueueDB",
10+
"PilotAgentsDB",
1011
"add_settings_annotation",
1112
"AvailableSecurityProperties",
1213
)
@@ -23,6 +24,7 @@
2324
from diracx.db.sql import AuthDB as _AuthDB
2425
from diracx.db.sql import JobDB as _JobDB
2526
from diracx.db.sql import JobLoggingDB as _JobLoggingDB
27+
from diracx.db.sql import PilotAgentsDB as _PilotAgentsDB
2628
from diracx.db.sql import SandboxMetadataDB as _SandboxMetadataDB
2729
from diracx.db.sql import TaskQueueDB as _TaskQueueDB
2830

@@ -38,6 +40,7 @@ def add_settings_annotation(cls: T) -> T:
3840
AuthDB = Annotated[_AuthDB, Depends(_AuthDB.transaction)]
3941
JobDB = Annotated[_JobDB, Depends(_JobDB.transaction)]
4042
JobLoggingDB = Annotated[_JobLoggingDB, Depends(_JobLoggingDB.transaction)]
43+
PilotAgentsDB = Annotated[_PilotAgentsDB, Depends(_PilotAgentsDB.transaction)]
4144
SandboxMetadataDB = Annotated[
4245
_SandboxMetadataDB, Depends(_SandboxMetadataDB.transaction)
4346
]

0 commit comments

Comments
 (0)