-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathconvert.py
More file actions
401 lines (346 loc) · 14.7 KB
/
convert.py
File metadata and controls
401 lines (346 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
"""
Convert endpoint - Main API for media conversion.
Uses FastAPI 0.124+ features including:
- Annotated dependencies with Doc
- Enhanced OpenAPI responses
- Typed dependency injection
"""
from typing import Dict, Any, Annotated
from uuid import uuid4
from fastapi import APIRouter, HTTPException, BackgroundTasks, Request, status
from annotated_doc import Doc
import structlog
from api.config import settings
from api.dependencies import DatabaseSession, RequiredAPIKey
from api.models.job import Job, JobStatus, ConvertRequest, JobCreateResponse, JobResponse, ErrorResponse
from api.utils.validators import validate_input_path, validate_output_path, validate_operations
logger = structlog.get_logger()
router = APIRouter()
# Import services from main - they are initialized during app startup
# Lazy import to avoid circular dependency
def get_storage_service():
from api.main import storage_service
return storage_service
def get_queue_service():
from api.main import queue_service
return queue_service
@router.post(
"/convert",
response_model=JobCreateResponse,
status_code=status.HTTP_201_CREATED,
summary="Create media conversion job",
response_description="Job created successfully",
responses={
201: {"description": "Job created and queued for processing"},
400: {"model": ErrorResponse, "description": "Invalid request parameters"},
401: {"model": ErrorResponse, "description": "Authentication required"},
429: {"model": ErrorResponse, "description": "Rate limit or concurrent job limit exceeded"},
503: {"model": ErrorResponse, "description": "Service temporarily unavailable"},
},
)
async def convert_media(
request: Annotated[ConvertRequest, Doc("Media conversion job specification")],
background_tasks: BackgroundTasks,
db: DatabaseSession,
api_key: RequiredAPIKey,
) -> JobCreateResponse:
"""
Create a new media conversion job.
This endpoint accepts various input formats and converts them based on the
specified output parameters and operations.
"""
try:
# Validate request size and complexity early
if len(request.operations) > settings.MAX_OPERATIONS_PER_JOB:
raise HTTPException(
status_code=400,
detail=f"Too many operations (max {settings.MAX_OPERATIONS_PER_JOB})"
)
# Check webhook URL for SSRF if provided
if request.webhook_url:
from urllib.parse import urlparse
import ipaddress
parsed = urlparse(request.webhook_url)
# Block internal/private networks using proper CIDR checking
hostname = parsed.hostname
if hostname:
# Block localhost variants
if hostname in ['localhost', '127.0.0.1', '0.0.0.0', '::1']:
raise HTTPException(status_code=400, detail="Invalid webhook URL: localhost not allowed")
# Try to parse as IP address for CIDR checking
try:
ip = ipaddress.ip_address(hostname)
# Define private/reserved networks
private_networks = [
ipaddress.ip_network('10.0.0.0/8'), # Class A private
ipaddress.ip_network('172.16.0.0/12'), # Class B private (172.16-31.x.x)
ipaddress.ip_network('192.168.0.0/16'), # Class C private
ipaddress.ip_network('127.0.0.0/8'), # Loopback
ipaddress.ip_network('169.254.0.0/16'), # Link-local
ipaddress.ip_network('100.64.0.0/10'), # Carrier-grade NAT
ipaddress.ip_network('::1/128'), # IPv6 loopback
ipaddress.ip_network('fc00::/7'), # IPv6 unique local
ipaddress.ip_network('fe80::/10'), # IPv6 link-local
]
for network in private_networks:
if ip in network:
raise HTTPException(status_code=400, detail="Invalid webhook URL: private network not allowed")
except ValueError:
# Not an IP address, hostname - could still resolve to private IP
# For safety, block common internal hostnames
lower_hostname = hostname.lower()
if lower_hostname.endswith('.local') or lower_hostname.endswith('.internal'):
raise HTTPException(status_code=400, detail="Invalid webhook URL: internal hostname not allowed")
# Parse input/output paths
input_path = request.input if isinstance(request.input, str) else request.input.get("path")
output_path = request.output if isinstance(request.output, str) else request.output.get("path")
# Validate paths
storage_service = get_storage_service()
input_backend, input_validated = await validate_input_path(input_path, storage_service)
output_backend, output_validated = await validate_output_path(output_path, storage_service)
# Validate operations
operations_validated = validate_operations(request.operations)
# Check concurrent job limit for this API key
from sqlalchemy import select, func
from api.models.job import JobStatus
# Count active jobs for this API key
active_jobs_stmt = select(func.count(Job.id)).where(
Job.api_key == api_key,
Job.status.in_([JobStatus.QUEUED, JobStatus.PROCESSING])
)
result = await db.execute(active_jobs_stmt)
active_job_count = result.scalar() or 0
# Get API key model to check limits
from api.services.api_key import APIKeyService
api_key_model = await APIKeyService.get_api_key_by_key(db, api_key)
max_concurrent = api_key_model.max_concurrent_jobs if api_key_model else 5 # Default limit
if active_job_count >= max_concurrent:
raise HTTPException(
status_code=429,
detail=f"Concurrent job limit exceeded ({active_job_count}/{max_concurrent})"
)
# Create job record with database-managed UUID to prevent race conditions
job = Job(
id=uuid4(), # Still generate UUID but let DB handle uniqueness
status=JobStatus.QUEUED,
priority=request.priority,
input_path=input_validated,
output_path=output_validated,
options=request.options,
operations=operations_validated,
api_key=api_key,
webhook_url=request.webhook_url,
webhook_events=request.webhook_events,
)
# Add to database with flush to get the ID before commit
db.add(job)
await db.flush() # This assigns the ID without committing
# Now we have a guaranteed unique job ID, queue it
job_id_str = str(job.id)
# Queue the job (do this before commit in case queuing fails)
try:
queue_service = get_queue_service()
await queue_service.enqueue_job(
job_id=job_id_str,
priority=request.priority,
)
except Exception as e:
# If queuing fails, rollback the job creation
await db.rollback()
raise HTTPException(status_code=503, detail="Failed to queue job")
# Now commit the transaction
await db.commit()
await db.refresh(job)
# Log job creation
logger.info(
"Job created",
job_id=str(job.id),
input_path=input_path,
output_path=output_path,
operations=len(operations_validated),
)
# Prepare response
job_response = JobResponse(
id=job.id,
status=job.status,
priority=job.priority,
progress=0.0,
stage="queued",
created_at=job.created_at,
links={
"self": f"/api/v1/jobs/{job.id}",
"events": f"/api/v1/jobs/{job.id}/events",
"logs": f"/api/v1/jobs/{job.id}/logs",
"cancel": f"/api/v1/jobs/{job.id}",
},
)
# Estimate cost/time (simplified for now)
estimated_cost = {
"processing_time": estimate_processing_time(request),
"credits": 0, # For self-hosted, no credits
}
return JobCreateResponse(
job=job_response,
estimated_cost=estimated_cost,
warnings=[],
)
except ValueError as e:
logger.error("Validation error", error=str(e))
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error("Failed to create job", error=str(e))
raise HTTPException(status_code=500, detail="Failed to create job")
@router.post(
"/analyze",
response_model=JobCreateResponse,
status_code=status.HTTP_201_CREATED,
summary="Analyze media quality metrics",
responses={
201: {"description": "Analysis job created"},
400: {"model": ErrorResponse, "description": "Invalid request"},
401: {"model": ErrorResponse, "description": "Authentication required"},
},
)
async def analyze_media(
request: Annotated[Dict[str, Any], Doc("Media analysis request")],
fastapi_request: Request,
db: DatabaseSession,
api_key: RequiredAPIKey,
) -> JobCreateResponse:
"""
Analyze media file for quality metrics.
This endpoint runs VMAF, PSNR, and SSIM analysis on the input media.
"""
# Apply endpoint-specific rate limiting
from api.utils.rate_limit import endpoint_rate_limiter
endpoint_rate_limiter.check_rate_limit(fastapi_request, "analyze", api_key)
# Convert to regular conversion job with analysis flag
convert_request = ConvertRequest(
input=request["input"],
output=request.get("output", request["input"]), # Output same as input for analysis
operations=[],
options={
"analyze_only": True,
"metrics": request.get("metrics", ["vmaf", "psnr", "ssim"]),
"reference": request.get("reference"),
},
)
return await convert_media(convert_request, BackgroundTasks(), db, api_key)
@router.post(
"/stream",
response_model=JobCreateResponse,
status_code=status.HTTP_201_CREATED,
summary="Create adaptive streaming output",
responses={
201: {"description": "Streaming job created"},
400: {"model": ErrorResponse, "description": "Invalid request"},
401: {"model": ErrorResponse, "description": "Authentication required"},
},
)
async def create_stream(
request: Annotated[Dict[str, Any], Doc("Streaming format request (HLS/DASH)")],
db: DatabaseSession,
api_key: RequiredAPIKey,
) -> JobCreateResponse:
"""
Create adaptive streaming formats (HLS/DASH).
This endpoint generates streaming-ready output from input media.
"""
# Convert to conversion job with streaming operations
stream_type = request.get("type", "hls")
variants = request.get("variants", [
{"resolution": "1080p", "bitrate": "5M"},
{"resolution": "720p", "bitrate": "2.5M"},
{"resolution": "480p", "bitrate": "1M"},
])
convert_request = ConvertRequest(
input=request["input"],
output=request["output"],
operations=[
{
"type": "stream",
"format": stream_type,
"variants": variants,
"segment_duration": request.get("segment_duration", 6),
}
],
options=request.get("options", {}),
)
return await convert_media(convert_request, BackgroundTasks(), db, api_key)
@router.post(
"/estimate",
summary="Estimate job processing time and resources",
response_description="Estimation results",
responses={
200: {"description": "Estimation successful"},
400: {"model": ErrorResponse, "description": "Invalid request"},
401: {"model": ErrorResponse, "description": "Authentication required"},
},
)
async def estimate_job(
request: Annotated[ConvertRequest, Doc("Job to estimate")],
api_key: RequiredAPIKey,
) -> Dict[str, Any]:
"""
Estimate processing time and resources for a conversion job.
This endpoint helps predict job duration without actually creating the job.
"""
try:
# Basic estimation logic
estimated_seconds = estimate_processing_time(request)
# Estimate output size
estimated_size = estimate_output_size(request)
# Resource requirements
resources = {
"cpu_cores": 4,
"memory_gb": 8,
"gpu_required": request.options.get("hardware_acceleration") == "gpu",
}
return {
"estimated": {
"duration_seconds": estimated_seconds,
"output_size_bytes": estimated_size,
},
"resources": resources,
"factors": {
"complexity": calculate_complexity(request),
"operations": len(request.operations),
},
}
except Exception as e:
logger.error("Estimation failed", error=str(e))
raise HTTPException(status_code=500, detail="Failed to estimate job")
def estimate_processing_time(request: ConvertRequest) -> int:
"""Estimate processing time in seconds."""
# Simple estimation based on operations
base_time = 60 # Base time for simple conversion
# Add time for each operation
for op in request.operations:
if op["type"] == "stream":
base_time *= 3 # Streaming takes longer
elif op["type"] == "analyze":
base_time *= 2 # Analysis is slower
else:
base_time += 30 # Other operations
# Adjust for quality settings
if isinstance(request.output, dict):
quality = request.output.get("video", {}).get("quality", "medium")
if quality == "high":
base_time *= 2
elif quality == "ultra":
base_time *= 4
return base_time
def estimate_output_size(request: ConvertRequest) -> int:
"""Estimate output file size in bytes."""
# Very rough estimation
# In production, this would be based on bitrate, duration, etc.
return 100 * 1024 * 1024 # 100MB default
def calculate_complexity(request: ConvertRequest) -> str:
"""Calculate job complexity."""
operations_count = len(request.operations)
if operations_count == 0:
return "simple"
elif operations_count <= 2:
return "moderate"
else:
return "complex"