1+ from sqlalchemy .exc import IntegrityError
12from sqlalchemy .ext .asyncio import AsyncSession
23
34from app .models .pv import PV
@@ -33,6 +34,14 @@ def _to_dto(self, pv: PV) -> PVElementDTO:
3334 lastModifiedDate = pv .updated_at ,
3435 )
3536
37+ @staticmethod
38+ def _normalize_address (address : str | None ) -> str | None :
39+ """Normalize optional PV addresses from API payloads."""
40+ if address is None :
41+ return None
42+ normalized = address .strip ()
43+ return normalized or None
44+
3645 async def search_paged (
3746 self ,
3847 search : str | None = None ,
@@ -61,12 +70,17 @@ async def get_by_id(self, pv_id: str) -> PVElementDTO | None:
6170
6271 async def create (self , data : NewPVElementDTO ) -> PVElementDTO :
6372 """Create a new PV."""
73+ setpoint_address = self ._normalize_address (data .setpointAddress )
74+ readback_address = self ._normalize_address (data .readbackAddress )
75+ config_address = self ._normalize_address (data .configAddress )
76+ if not any ([setpoint_address , readback_address , config_address ]):
77+ raise ValueError ("At least one address (setpoint, readback, or config) is required" )
78+
6479 # Check for duplicate addresses
65- for address in [data .setpointAddress , data .readbackAddress , data .configAddress ]:
66- if address :
67- existing = await self .pv_repo .find_by_address (address )
68- if existing :
69- raise ValueError (f"PV with address '{ address } ' already exists" )
80+ if setpoint_address :
81+ existing = await self .pv_repo .find_by_setpoint (setpoint_address )
82+ if existing :
83+ raise ValueError (f"PV with setpoint address '{ setpoint_address } ' already exists" )
7084
7185 # Get tags
7286 tags = []
@@ -76,9 +90,9 @@ async def create(self, data: NewPVElementDTO) -> PVElementDTO:
7690 raise ValueError ("One or more tag IDs are invalid" )
7791
7892 pv = PV (
79- setpoint_address = data . setpointAddress ,
80- readback_address = data . readbackAddress ,
81- config_address = data . configAddress ,
93+ setpoint_address = setpoint_address ,
94+ readback_address = readback_address ,
95+ config_address = config_address ,
8296 device = data .device ,
8397 description = data .description ,
8498 abs_tolerance = data .absTolerance ,
@@ -87,14 +101,41 @@ async def create(self, data: NewPVElementDTO) -> PVElementDTO:
87101 tags = tags ,
88102 )
89103
90- pv = await self .pv_repo .create (pv )
104+ try :
105+ pv = await self .pv_repo .create (pv )
106+ except IntegrityError as e :
107+ raise ValueError (f"PV addresses violate uniqueness constraints: { e } " ) from e
91108 return self ._to_dto (pv )
92109
93110 async def create_many (self , data_list : list [NewPVElementDTO ]) -> list [PVElementDTO ]:
94111 """Bulk create PVs."""
112+ normalized_records : list [tuple [str | None , str | None , str | None , NewPVElementDTO ]] = []
113+ for data in data_list :
114+ setpoint_address = self ._normalize_address (data .setpointAddress )
115+ readback_address = self ._normalize_address (data .readbackAddress )
116+ config_address = self ._normalize_address (data .configAddress )
117+ if not any ([setpoint_address , readback_address , config_address ]):
118+ raise ValueError ("At least one address (setpoint, readback, or config) is required" )
119+ normalized_records .append ((setpoint_address , readback_address , config_address , data ))
120+
121+ setpoints = [r [0 ] for r in normalized_records if r [0 ]]
122+ seen = set ()
123+ duplicate_setpoints = set ()
124+ for s in setpoints :
125+ if s in seen :
126+ duplicate_setpoints .add (s )
127+ else :
128+ seen .add (s )
129+ if duplicate_setpoints :
130+ raise ValueError (f"Duplicate setpoint addresses in import: { sorted (duplicate_setpoints )[:10 ]} " )
131+
132+ existing_setpoints = await self .pv_repo .get_existing_setpoints (setpoints )
133+ if existing_setpoints :
134+ raise ValueError (f"Setpoint addresses already exist: { sorted (existing_setpoints )[:10 ]} " )
135+
95136 # Collect all tag IDs
96137 all_tag_ids = set ()
97- for data in data_list :
138+ for _ , _ , _ , data in normalized_records :
98139 all_tag_ids .update (data .tags )
99140
100141 # Fetch all tags at once
@@ -105,12 +146,12 @@ async def create_many(self, data_list: list[NewPVElementDTO]) -> list[PVElementD
105146
106147 # Create PV objects
107148 pvs = []
108- for data in data_list :
149+ for setpoint_address , readback_address , config_address , data in normalized_records :
109150 pv_tags = [tags_by_id [tid ] for tid in data .tags if tid in tags_by_id ]
110151 pv = PV (
111- setpoint_address = data . setpointAddress ,
112- readback_address = data . readbackAddress ,
113- config_address = data . configAddress ,
152+ setpoint_address = setpoint_address ,
153+ readback_address = readback_address ,
154+ config_address = config_address ,
114155 device = data .device ,
115156 description = data .description ,
116157 abs_tolerance = data .absTolerance ,
@@ -121,7 +162,10 @@ async def create_many(self, data_list: list[NewPVElementDTO]) -> list[PVElementD
121162 pvs .append (pv )
122163
123164 # Bulk insert
124- pvs = await self .pv_repo .bulk_create (pvs )
165+ try :
166+ pvs = await self .pv_repo .bulk_create (pvs )
167+ except IntegrityError as e :
168+ raise ValueError (f"PV addresses violate uniqueness constraints: { e } " ) from e
125169 return [self ._to_dto (pv ) for pv in pvs ]
126170
127171 async def update (self , pv_id : str , data : UpdatePVElementDTO ) -> PVElementDTO | None :
0 commit comments