Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions app/api/v1/export_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import io
import csv
from datetime import datetime

from fastapi import Depends, Response, APIRouter
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.session import get_db
from app.services.pv_service import PVService
from app.services.tag_service import TagService

router = APIRouter(prefix="/export", tags=["Export"])


@router.get("/export_csv")
async def export_database_csv(db: AsyncSession = Depends(get_db)):
"""
Export PVs and Tags to CSV format.
Returns two CSV files: one for PVs and one for Tags.
"""
pv_service = PVService(db)
tag_service = TagService(db)

# ======= Export PVs =======
pvs_result = await pv_service.search_paged(page_size=10000)

# Generate PVs CSV
pv_output = io.StringIO()
pv_writer = csv.writer(pv_output)

# write PV headers for the CSV
pv_headers = [
"ID",
"Setpoint Address",
"Readback Address",
"Config Address",
"Device",
"Description",
"Abs Tolerance",
"Rel Tolerance",
"Read Only",
]
pv_writer.writerow(pv_headers)

# Write PV data into rows for the CSV
for pv in pvs_result.results:
row = [
pv.id,
pv.setpointAddress,
pv.readbackAddress,
pv.configAddress,
pv.device,
pv.description,
pv.absTolerance,
pv.relTolerance,
pv.readOnly,
]
pv_writer.writerow(row)

# Prepare the PV rows to put in the CSV file
pv_content = pv_output.getvalue()

# ======= Export Tags =======
tag_groups_summary = await tag_service.get_all_groups_summary()

# Generate Tags CSV
tag_output = io.StringIO()
tag_writer = csv.writer(tag_output)

# Write Tag headers for the CSV
tag_writer.writerow(["Group ID", "Group Name", "Group Description", "Tag ID", "Tag Name", "Tag Description"])

# Write Tag data into rows for the CSV
# Get full group data with tags for each group
for group_summary in tag_groups_summary:
group = await tag_service.get_group_by_id(group_summary.id)
if group and group.tags:
for tag in group.tags:
tag_writer.writerow([group.id, group.name, group.description, tag.id, tag.name, tag.description])

# Prepare the Tag rows to put in the CSV file
tag_content = tag_output.getvalue()

# Create a combined response with both CSV files
combined_content = f"=== PVs Export ===\n{pv_content}\n\n=== Tags Export ===\n{tag_content}"

# Return as a downloadable CSV file
return Response(
content=combined_content,
media_type="csv",
headers={
"Content-Disposition": f"attachment; filename=database_export_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv"
},
)
2 changes: 2 additions & 0 deletions app/api/v1/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.api.v1.health import router as health_router
from app.api.v1.snapshots import router as snapshots_router
from app.api.v1.websocket import router as websocket_router
from app.api.v1.export_csv import router as export_router

router = APIRouter(prefix="/v1")

Expand All @@ -15,3 +16,4 @@
router.include_router(jobs_router)
router.include_router(websocket_router)
router.include_router(health_router)
router.include_router(export_router)
83 changes: 83 additions & 0 deletions tests/test_api/test_export_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Tests for CSV export API endpoints.
"""

import pytest
from httpx import AsyncClient


class TestExportCSV:
"""Tests for the database export CSV endpoint."""

@pytest.mark.asyncio
async def test_export_csv_success(self, client: AsyncClient, sample_pvs: list, sample_tag: tuple):
"""Test successful CSV export with PVs and Tags."""
response = await client.get("/v1/export/export_csv")

assert response.status_code == 200
assert response.headers["content-type"] == "csv"
assert "Content-Disposition" in response.headers
assert "attachment; filename=database_export_" in response.headers["Content-Disposition"]

content = response.text
assert "=== PVs Export ===" in content
assert "=== Tags Export ===" in content

# Check all PV headers are present
assert "ID" in content
assert "Setpoint Address" in content
assert "Readback Address" in content
assert "Config Address" in content
assert "Device" in content
assert "Description" in content
assert "Abs Tolerance" in content
assert "Rel Tolerance" in content
assert "Read Only" in content

# Check all Tag headers are present
assert "Group ID" in content
assert "Group Name" in content
assert "Group Description" in content
assert "Tag ID" in content
assert "Tag Name" in content
assert "Tag Description" in content

# Check that PV data is included
for pv in sample_pvs:
assert pv["setpointAddress"] in content
assert pv["readbackAddress"] in content
assert pv["device"] in content

tag = sample_tag[1]

# Check that Tag data is included
assert tag["name"] in content
assert tag["description"] in content

@pytest.mark.asyncio
async def test_export_csv_empty_database(self, client: AsyncClient):
"""Test CSV export with no data in database."""
response = await client.get("/v1/export/export_csv")

assert response.status_code == 200
content = response.text

# Should still have headers and section markers
assert "=== PVs Export ===" in content
assert "=== Tags Export ===" in content
assert "ID" in content # PV headers
assert "Group ID" in content # Tag headers

@pytest.mark.asyncio
async def test_export_csv_downloadable_filename(self, client: AsyncClient):
"""Test that export CSV returns a proper downloadable filename."""
response = await client.get("/v1/export/export_csv")

assert response.status_code == 200
content_disposition = response.headers.get("Content-Disposition", "")

assert "attachment" in content_disposition
assert "filename=" in content_disposition
assert "database_export_" in content_disposition
# Check filename has date format (YYYY-MM-DD_HH-MM-SS)
assert ".csv" in content_disposition
Loading
Loading