-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathservice.py
More file actions
130 lines (100 loc) · 3.79 KB
/
service.py
File metadata and controls
130 lines (100 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from enum import Enum
from fastapi import HTTPException
from sqlalchemy.orm import Session
from app.modules.events.models import Event
from app.modules.fields.models import Field
from app.modules.tags.models import Tag
from app.shared.models import EventField, EventTag
from app.shared.service import assert_db_empty
from .schemas import ExportBundle, ExportEvent, ExportField, ExportTag, ImportBundle
class ImportSource(str, Enum):
json = "json"
csv = "csv"
class ExportTarget(str, Enum):
json = "json"
csv = "csv"
markdown = "markdown"
def export_bundle(db: Session) -> ExportBundle:
tags = db.query(Tag).all()
fields = db.query(Field).all()
events = db.query(Event).all()
return ExportBundle(
tags=[ExportTag(id=tag.id, description=tag.description) for tag in tags],
fields=[
ExportField(
name=field.name,
description=field.description,
field_type=field.field_type,
)
for field in fields
],
events=[
ExportEvent(
name=event.name,
description=event.description,
links=event.links,
tags=[tag.id for tag in event.tags],
fields=[field.name for field in event.fields],
)
for event in events
],
)
def export_to(target: ExportTarget, db: Session) -> ExportBundle | str:
if target == ExportTarget.json:
return export_bundle(db)
elif target == ExportTarget.csv:
raise HTTPException(status_code=501, detail="CSV export is not implemented yet")
elif target == ExportTarget.markdown:
raise HTTPException(
status_code=501, detail="Markdown export is not implemented yet"
)
else:
raise HTTPException(status_code=400, detail=f"Unknown export source: {target}")
def import_bundle(bundle: ImportBundle, db: Session):
assert_db_empty(db)
# Create fields and build name → model map
field_map = {}
for field_data in bundle.fields:
field = Field(
name=field_data.name,
description=field_data.description,
field_type=field_data.field_type,
)
db.add(field)
db.flush()
field_map[field.name] = field
tag_definitions = {tag.id: tag for tag in bundle.tags}
all_tag_ids = set(tag_definitions.keys())
for event in bundle.events:
all_tag_ids.update(event.tags)
for tag_id in all_tag_ids:
tag_def = tag_definitions.get(tag_id)
db.add(Tag(id=tag_id, description=tag_def.description if tag_def else None))
db.flush()
for event_data in bundle.events:
event = Event(
name=event_data.name,
description=event_data.description,
links=event_data.links or [],
)
db.add(event)
db.flush()
for tag_id in event_data.tags:
db.add(EventTag(event_id=event.id, tag_id=tag_id))
for field_name in event_data.fields:
if field_name not in field_map:
raise HTTPException(
status_code=400, detail=f"Unknown field name: {field_name}"
)
db.add(EventField(event_id=event.id, field_id=field_map[field_name].id))
db.commit()
def import_from(source: ImportSource, data: any, db: Session):
if source == ImportSource.json:
if not isinstance(data, dict):
raise HTTPException(status_code=400, detail="Expected JSON object")
bundle = ImportBundle(**data)
import_bundle(bundle, db)
elif source == ImportSource.csv:
raise HTTPException(status_code=501, detail="CSV import is not implemented yet")
else:
raise HTTPException(status_code=400, detail=f"Unknown import source: {source}")