Comprehensive technical architecture documentation for the autodl_enhanced Telegram download bot.
- System Overview
- Component Architecture
- Data Flow
- Async Model
- Database Design
- Retry and Backoff Strategy
- Deduplication System
- Error Handling
- Performance Optimization
- Security Architecture
autodl_enhanced is built on a modular asynchronous architecture with clear separation of concerns:
┌─────────────────────────────────────────────────────────────────────────┐
│ TELEGRAM USER LAYER │
│ (Users send URLs and commands) │
└────────────────────────────────┬────────────────────────────────────────┘
│
┌────────────▼───────────────┐
│ MESSAGE HANDLERS LAYER │
│ (command & message parse) │
└────────────┬───────────────┘
│
┌───────────────────────┼───────────────────────┐
│ │ │
┌────▼──────┐ ┌─────▼──────┐ ┌────────▼────┐
│ COMMAND │ │ MESSAGE │ │ PLAYLIST │
│ HANDLER │ │ HANDLER │ │ EXTRACTION │
└────┬──────┘ └─────┬──────┘ └────────┬────┘
│ │ │
└─────────────────────┼────────────────────────┘
│
┌──────────▼──────────┐
│ VALIDATION LAYER │
│ (URL regex checks) │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ DEDUPLICATION LAYER │
│ (URL hash, ID) │
└──────────┬──────────┘
│
┌──────────▼──────────────────┐
│ QUEUE MANAGER LAYER │
│ (SQLite persistence) │
│ - Add tasks │
│ - Fetch pending tasks │
│ - Manage retry scheduling │
└──────────┬──────────────────┘
│
┌────────────────┴────────────────┐
│ │
┌────▼──────────┐ ┌──────────▼────┐
│ DOWNLOAD MGR │ │ SYSTEM MONITOR│
│ (worker pool) │ │ (disk, CPU) │
└────┬──────────┘ └──────────────┘
│
┌─────────┴──────────┐
│ │
┌───▼──────┐ ┌───▼──────┐
│ YT-DLP │ │ ARIA2C │
│(native) │ │(optional)│
└───┬──────┘ └───┬──────┘
└────────┬──────────┘
│
┌────────▼────────────┐
│ DOWNLOADED MEDIA │
│ /path/to/downloads │
└─────────────────────┘
Responsibility: Bot initialization, handler registration, event loop management
Key Functions:
__init__(): Initialize Telegram Application, register handlersstart_application(): Start bot polling and async managersstop_application(): Graceful shutdown of services
Handler Registration:
# Message handlers
application.add_handler(MessageHandler(filters.TEXT, message_handler.handle_text_message))
application.add_handler(MessageHandler(filters.Document.FileExtension("txt"), message_handler.handle_text_file))
# Command handlers
application.add_handler(CommandHandler("start", command_handler.cmd_start))
application.add_handler(CommandHandler("queue", command_handler.cmd_queue))
application.add_handler(CommandHandler("status", command_handler.cmd_status))
application.add_handler(CommandHandler("pause", command_handler.cmd_pause))
application.add_handler(CommandHandler("resume", command_handler.cmd_resume))
application.add_handler(CommandHandler("retry", command_handler.cmd_retry))
application.add_handler(CommandHandler("clear", command_handler.cmd_clear))Async Managers:
- Starts
QueueManager.run()in background task - Starts
DownloadManager.run()in background task - Runs both concurrently with Telegram polling
Responsibility: Environment variable loading, configuration validation, default values
Key Components:
@dataclass
class Config:
# Telegram
telegram_bot_token: str
telegram_admin_ids: List[int]
# Downloads
download_dir: Path
max_concurrent: int
max_retries: int
retry_sleep: float
socket_timeout: int
max_video_quality: str
preferred_format: str
skip_hls: bool
skip_dash: bool
use_aria2c: bool
cookies_file: Optional[Path]
# System
min_disk_space_gb: float
max_playlist_videos: int
log_level: str
# Database
db_path: PathLoading Logic:
- Load from
.envfile usingpython-dotenv - Validate required fields (TELEGRAM_BOT_TOKEN)
- Apply type conversions (strings to int, Path, etc.)
- Apply sensible defaults for optional fields
- Return Config dataclass instance
Responsibility: Persistent task queue management, SQLite operations, retry scheduling
Architecture:
@dataclass
class DownloadTask:
id: int
url: str
status: str # pending, processing, completed, failed
attempts: int
added_at: float # Unix timestamp
updated_at: float
next_attempt_at: Optional[float]
file_path: Optional[str]
error_message: Optional[str]
url_hash: Optional[str]
video_id: Optional[str]Core Operations:
| Method | Purpose | Usage |
|---|---|---|
add_task(url, video_id, url_hash) |
Add URL to queue | After deduplication validation |
fetch_pending_tasks(limit) |
Get tasks ready to download | Download manager worker loop |
mark_processing(task_id) |
Mark task as downloading | When download starts |
mark_completed(task_id, file_path) |
Mark task successful | On download completion |
mark_failed(task_id, error_msg) |
Mark task failed | On download error |
get_status_summary() |
Queue statistics | /status command |
check_duplicate(url, video_id) |
Check if already queued | Deduplication |
reschedule_failed_tasks() |
Retry failed tasks | Periodic background task |
clear_completed_tasks() |
Remove done/failed tasks | /clear command |
Database Indices:
CREATE INDEX idx_url_hash ON tasks(url_hash);
CREATE INDEX idx_video_id ON tasks(video_id);
CREATE INDEX idx_status ON tasks(status);Async Operations:
- Uses
aiosqlitefor non-blocking SQLite access asyncio.Lock()for thread-safe database operations- All DB operations are
async def
Responsibility: Download orchestration, worker pool management, progress tracking, retry logic
Architecture:
Worker Pool Pattern:
┌─────────────────────────────────────────────────────────────┐
│ Download Manager │
│ max_concurrent=8 (configurable) │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ Worker 4 │ ... │
│ │ (async) │ │ (async) │ │ (async) │ │ (async) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
Key Methods:
| Method | Purpose |
|---|---|
run() |
Main event loop, spawns workers |
_worker() |
Individual worker coroutine (fetch, download, handle errors) |
_download(task) |
yt-dlp execution wrapper |
pause_downloads() |
Set pause flag, pause workers gracefully |
resume_downloads() |
Clear pause flag, resume workers |
get_active_downloads() |
Get in-progress task list |
Worker Lifecycle:
async def _worker(self):
while True:
# Check pause flag
if self._pause_event.is_set():
await asyncio.sleep(1)
continue
# Fetch task from queue
task = await self.queue_manager.fetch_pending_task()
if not task:
await asyncio.sleep(2)
continue
# Mark as processing
await self.queue_manager.mark_processing(task.id)
# Download
success, file_path, error = await self._download(task)
# Update queue based on result
if success:
await self.queue_manager.mark_completed(task.id, file_path)
else:
# Exponential backoff retry
next_attempt = time.time() + (2 ** task.attempts * self.config.retry_sleep)
await self.queue_manager.reschedule_task(task.id, next_attempt, error)Download Execution:
async def _download(self, task: DownloadTask) -> tuple:
"""
Execute yt-dlp download asynchronously
Returns: (success: bool, file_path: Optional[str], error: Optional[str])
"""
try:
# Disk space check
if free_space < min_required:
return (False, None, "Insufficient disk space")
# yt-dlp arguments
ydl_opts = {
'format': self._build_format_string(),
'outtmpl': os.path.join(self.config.download_dir, '%(title)s.%(ext)s'),
'socket_timeout': self.config.socket_timeout,
'max_downloads': 1,
'quiet': False,
'no_warnings': False,
}
# Optional aria2c
if self.config.use_aria2c:
ydl_opts['external_downloader'] = 'aria2c'
# Run yt-dlp in executor (non-blocking)
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = await asyncio.get_event_loop().run_in_executor(
None,
ydl.extract_info,
task.url,
False
)
file_path = ydl.prepare_filename(info)
return (True, file_path, None)
except Exception as e:
return (False, None, str(e))Quality Selection:
def _build_format_string(self) -> str:
"""
Build yt-dlp format string based on configuration
Example output: "best[height<=1080]/best"
"""
quality_map = {
'4K': '2160',
'1440p': '1440',
'1080p': '1080',
'720p': '720',
'480p': '480',
}
max_height = quality_map.get(self.config.max_video_quality, '1080')
return f"best[height<={max_height}]/best"Responsibility: Parse incoming text and file messages, extract URLs
Key Functions:
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle direct URL messages"""
url = context.user_data.get('text', '')
if is_valid_url(url):
await queue_manager.add_task(url)
await update.message.reply_text(f"Queued: {url}")
async def handle_text_file(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle .txt file uploads with URLs"""
file = await context.bot.get_file(update.message.document.file_id)
content = await file.download_as_bytearray()
urls = extract_urls_from_text(content.decode('utf-8'))
for url in urls:
await queue_manager.add_task(url)
await update.message.reply_text(f"Queued {len(urls)} URLs")URL Extraction:
- Uses regex patterns to find URLs in text
- Validates http/https schemes only
- Splits newlines for file-based URLs
Responsibility: Handle Telegram bot commands with admin authentication
Commands Implemented:
| Command | Purpose | Auth |
|---|---|---|
/start |
Welcome message, list commands | Admin |
/queue |
Show pending/processing tasks | Admin |
/status |
System metrics and queue summary | Admin |
/pause |
Pause all downloads | Admin |
/resume |
Resume paused downloads | Admin |
/retry |
Re-attempt failed tasks | Admin |
/clear |
Remove completed/failed tasks | Admin |
Admin Check Pattern:
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id not in config.telegram_admin_ids:
await update.message.reply_text("⛔ Unauthorized")
return
# ... execute commandStatus Command Output:
📊 System Status
CPU: 42.5% | RAM: 58.3% | Disk: 71.2%
📥 Queue Summary
Pending: 8 | Processing: 3 | Completed: 124 | Failed: 2
⏳ Active Downloads
1. media_file_1.mp4 - 45% (5m 30s remaining)
2. media_file_2.mp4 - 78% (2m 15s remaining)
Responsibility: URL validation and extraction
Key Functions:
def is_valid_url(url: str) -> bool:
"""Validate URL format"""
pattern = r'^https?://[^\s]+'
return bool(re.match(pattern, url))
def extract_urls_from_text(text: str) -> List[str]:
"""Extract all URLs from multi-line text"""
pattern = r'https?://[^\s]+'
return re.findall(pattern, text)
def is_playlist_url(url: str) -> bool:
"""Detect if URL is a playlist"""
return 'playlist' in url.lower() or 'series' in url.lower()Responsibility: Multi-level duplicate detection
Strategy:
async def check_duplicate(url: str) -> bool:
"""Check if URL already in queue (any status)"""
# Level 1: Normalize URL and compute hash
normalized = normalize_url(url)
url_hash = hashlib.sha256(normalized.encode()).hexdigest()
# Level 2: Extract video ID
video_id = extract_video_id(url)
# Level 3: Query database
existing = await queue_manager.check_duplicate(url_hash, video_id)
return existing is not None
def normalize_url(url: str) -> str:
"""Remove tracking parameters and normalize"""
# Remove utm_*, fbclid, gclid, etc.
parsed = urllib.parse.urlparse(url)
params = urllib.parse.parse_qs(parsed.query)
# Keep only essential parameters
clean_params = {k: v for k, v in params.items()
if not is_tracking_param(k)}
clean_query = urllib.parse.urlencode(clean_params, doseq=True)
return urllib.parse.urlunparse((
parsed.scheme, parsed.netloc, parsed.path,
parsed.params, clean_query, parsed.fragment
))
def extract_video_id(url: str) -> Optional[str]:
"""Extract platform-specific video ID"""
# YouTube: v= parameter or shorts/
# Pornhub: viewkey parameter
# Twitter: /status/ID
# etc.Supported Platforms for Video ID Extraction:
- YouTube (v=, youtube.com/shorts/)
- Pornhub (viewkey)
- Xvideos, Xhamster, Redtube (numeric ID)
- Twitter/X (status ID)
- Reddit (post ID)
- Spankbang, OnlyFans
Responsibility: Unified logging setup
Configuration:
def setup_logger(log_level: str, log_file: Path) -> logging.Logger:
logger = logging.getLogger(__name__)
logger.setLevel(getattr(logging, log_level))
# File handler with rotation
file_handler = RotatingFileHandler(
log_file,
maxBytes=10_000_000, # 10 MB
backupCount=5
)
# Console handler
console_handler = logging.StreamHandler()
# Format
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] [%(name)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return loggerLog Rotation:
- Max 10 MB per file
- Keep 5 backup files
- Automatic rollover on size limit
Responsibility: Monitor disk space, trigger pause on low space
Logic:
async def check_disk_space(download_dir: Path, min_space_gb: float) -> bool:
"""Check if sufficient disk space available"""
stat = os.statvfs(download_dir)
free_space_gb = (stat.f_bavail * stat.f_frsize) / (1024**3)
if free_space_gb < min_space_gb:
logger.warning(f"Low disk space: {free_space_gb:.1f}GB < {min_space_gb}GB")
return False
return TrueIntegration:
- Checked before each download start
- Pauses downloads if threshold breached
- Resumes when space available again
Responsibility: System resource monitoring
Metrics:
def get_system_metrics() -> Dict[str, float]:
"""Get CPU, memory, and disk usage percentages"""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
}Display in /status command:
CPU: 42.5% | RAM: 58.3% | Disk: 71.2%
1. User sends URL
└─> message_handler.handle_text_message()
2. Validate URL
└─> validators.is_valid_url(url)
3. Check for duplicates
└─> deduplication.extract_video_id(url)
└─> deduplication.normalize_url(url)
└─> queue_manager.check_duplicate(url_hash, video_id)
4. Add to queue
└─> queue_manager.add_task(url, video_id, url_hash)
└─> INSERT INTO tasks VALUES (...)
5. Download manager picks up task
└─> download_manager._worker()
└─> queue_manager.fetch_pending_task()
└─> queue_manager.mark_processing(task.id)
6. Download execution
└─> download_manager._download(task)
└─> disk_monitor.check_disk_space()
└─> yt_dlp.YoutubeDL(ydl_opts).extract_info(url)
└─> Run in executor (non-blocking)
7. Handle result
IF success:
└─> queue_manager.mark_completed(task.id, file_path)
└─> UPDATE tasks SET status='completed'
IF failure:
└─> queue_manager.reschedule_task(task.id, next_attempt, error)
└─> UPDATE tasks SET status='pending', next_attempt_at=...
└─> (retry with exponential backoff)
User sends: /watch?list=PLxxxxx
│
└─> message_handler detects playlist
│
└─> handlers/message_handler.extract_playlist_urls()
│
└─> yt-dlp extracts playlist entries
│ (up to MAX_PLAYLIST_VIDEOS=300)
│
└─> For each video URL:
├─> Deduplication check
└─> Add to queue
│
└─> User notified: "Queued 47 videos from playlist"
Main Event Loop (asyncio)
├─ Telegram Application polling
│ └─ Handlers dispatch (command, message)
│
├─ QueueManager.run()
│ └─ Periodic status updates
│ └─ Retry scheduling
│
└─ DownloadManager.run()
├─ Worker pool (8 concurrent)
│ ├─ Worker 1: fetch_task → download → update_queue
│ ├─ Worker 2: fetch_task → download → update_queue
│ ├─ ... (up to MAX_CONCURRENT)
│
├─ Pause/Resume control
└─ System monitoring
Pattern 1: Worker Pool
# Multiple workers running in parallel
async def _worker(self):
while True:
task = await self.queue_manager.fetch_pending_task()
# Only one worker gets each task (atomic fetch)
await self._download(task)Pattern 2: Atomic Task Fetch
async def fetch_pending_task(self) -> Optional[DownloadTask]:
async with self._lock: # asyncio.Lock()
# Update status within lock (atomic)
await cursor.execute(
"UPDATE tasks SET status='processing' WHERE status='pending'"
)
# Fetch the updated task
result = await cursor.fetchone()Pattern 3: Pause/Resume with Events
self._pause_event = asyncio.Event() # Not set = paused
# Pause
self._pause_event.clear()
# Resume
self._pause_event.set()
# In worker
if not self._pause_event.is_set():
await asyncio.sleep(1)
continueCREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
status TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
added_at REAL NOT NULL,
updated_at REAL NOT NULL,
next_attempt_at REAL,
file_path TEXT,
error_message TEXT,
url_hash TEXT,
video_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_url_hash ON tasks(url_hash);
CREATE INDEX IF NOT EXISTS idx_video_id ON tasks(video_id);
CREATE INDEX IF NOT EXISTS idx_status ON tasks(status);pending
│
├─ (download starts) → processing
│ ├─ (success) → completed ✓
│ └─ (error) → pending (reschedule)
│
└─ (reached max attempts) → failed ✗
Query: Fetch Pending Task
SELECT * FROM tasks
WHERE status='pending' AND next_attempt_at <= strftime('%s', 'now')
ORDER BY added_at ASC
LIMIT 1;Query: Check Duplicate
SELECT * FROM tasks
WHERE (url_hash=? OR video_id=?)
AND status IN ('pending', 'processing', 'completed');Query: Get Status Summary
SELECT status, COUNT(*) as count FROM tasks GROUP BY status;Query: Reschedule Failed
UPDATE tasks
SET status='pending', next_attempt_at=?, attempts=attempts+1, updated_at=?
WHERE status='failed' AND attempts < max_retries;next_retry_time = current_time + (2^attempts * retry_sleep_base)
Example (retry_sleep_base=1s):
Attempt 1: immediate
Attempt 2: 2^1 * 1 = 2 seconds
Attempt 3: 2^2 * 1 = 4 seconds
Attempt 4: 2^3 * 1 = 8 seconds
Attempt 5: 2^4 * 1 = 16 seconds
After 5: marked as FAILED
async def reschedule_task(self, task_id: int, error_msg: str):
"""Reschedule failed task with exponential backoff"""
async with self._lock:
task = await self.get_task(task_id)
if task.attempts >= self.config.max_retries:
# Max retries exceeded
await cursor.execute(
"UPDATE tasks SET status='failed', error_message=? WHERE id=?",
(error_msg, task_id)
)
else:
# Schedule next retry
next_delay = 2 ** task.attempts * self.config.retry_sleep
next_attempt = time.time() + next_delay
await cursor.execute(
"UPDATE tasks SET status='pending', next_attempt_at=?, "
"attempts=?, error_message=?, updated_at=? WHERE id=?",
(next_attempt, task.attempts + 1, error_msg, time.time(), task_id)
)- Reduce Server Load: Don't hammer failing URLs immediately
- Handle Transient Errors: Give servers time to recover (rate limits, temporary outages)
- Efficient Resource Usage: Don't waste bandwidth retrying quickly
- Avoid IP Bans: Staggered retries look more like organic traffic
Level 1: URL Hashing
normalized = normalize_url(url) # Remove utm_*, fbclid, etc.
url_hash = hashlib.sha256(normalized.encode()).hexdigest()
# Query: WHERE url_hash = ?Level 2: Video ID Extraction
video_id = extract_video_id(url)
# YouTube: extract 'v=' parameter
# Pornhub: extract 'viewkey=' parameter
# Twitter: extract status ID from URL path
# etc.
# Query: WHERE video_id = ?Level 3: Database Lookup
SELECT * FROM tasks
WHERE (url_hash = ? OR video_id = ?)
AND status IN ('pending', 'processing', 'completed');| Platform | ID Extraction | Hash Fallback |
|---|---|---|
| YouTube | v= parameter, /shorts/ path | URL hash |
| Pornhub | viewkey parameter | URL hash |
| Xvideos | numeric path parameter | URL hash |
| Twitter/X | status ID from path | URL hash |
| post ID from path | URL hash | |
| Twitch | channel/video path | URL hash |
| Generic | N/A | URL hash |
- URL Hash: Detects exact duplicates and parameter reordering
- Video ID: Detects same content shared via different URLs
- Database Lookup: Handles edge cases and ensures no duplicates
Network Errors:
- Connection timeout
- Connection refused
- DNS resolution failed
- Action: Retry with backoff
Media Availability Errors:
- Video removed/deleted
- Video private/restricted
- Geoblocked
- Action: Mark as failed (don't retry)
Format/Extraction Errors:
- No downloadable formats
- Unsupported platform
- Action: Mark as failed
System Errors:
- Disk full
- Permission denied
- Out of memory
- Action: Pause downloads, resume when resolved
async def _download(self, task: DownloadTask):
try:
# Disk space check
if not await disk_monitor.check_disk_space(...):
# Pause all downloads
await self.pause_downloads()
return (False, None, "Insufficient disk space")
# yt-dlp execution
with yt_dlp.YoutubeDL(opts) as ydl:
info = ydl.extract_info(task.url, False)
file_path = ydl.prepare_filename(info)
return (True, file_path, None)
except yt_dlp.utils.DownloadError as e:
# Permanent error (video removed, etc)
if "not available" in str(e) or "removed" in str(e):
return (False, None, f"Permanent: {str(e)}")
else:
# Transient error, retry
return (False, None, str(e))
except Exception as e:
# Unexpected error, retry
logger.error(f"Download failed for {task.url}: {e}")
return (False, None, str(e))- Single download fails: Task rescheduled, other downloads continue
- Disk space low: All downloads paused, system remains responsive
- Database unavailable: Bot can't queue, but existing tasks continue
- Telegram API down: Downloads continue, status updates delayed
Worker Pool: Configurable concurrency (default 8)
MAX_CONCURRENT=8 # Adjust based on system capacityOptimal Range:
- Low-end systems (1-2 GB RAM): 2-4 workers
- Mid-range systems (4-8 GB RAM): 4-8 workers
- High-end systems (16+ GB RAM): 8-16 workers
Aria2c Integration:
if config.use_aria2c:
ydl_opts['external_downloader'] = 'aria2c'
ydl_opts['external_downloader_args'] = '-x 16 -k 1M' # 16 connectionsSocket Timeout:
SOCKET_TIMEOUT=30 # Seconds before connection times outIndexing:
CREATE INDEX idx_url_hash ON tasks(url_hash); -- Fast dedup lookup
CREATE INDEX idx_video_id ON tasks(video_id); -- Platform ID lookup
CREATE INDEX idx_status ON tasks(status); -- Fast status filteringAsync Operations:
- All DB operations use
aiosqlite(non-blocking) - No synchronous database calls in async functions
# Build efficient format string
format = "best[height<=1080]/best"
# This tells yt-dlp:
# 1. Try best video/audio combo at ≤1080p
# 2. Fall back to best availableSkip Unnecessary Codecs:
SKIP_HLS=true # Skip HLS adaptive streams
SKIP_DASH=true # Skip DASH adaptive streamsThese consume bandwidth extracting manifests without being used.
Efficient Download Template:
'outtmpl': os.path.join(download_dir, '%(title)s.%(ext)s')Atomic Move (if needed):
- yt-dlp handles atomic downloads within target directory
Telegram Admin Check:
@admin_only
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id not in config.telegram_admin_ids:
await update.message.reply_text("⛔ Unauthorized")
returnConfiguration:
TELEGRAM_ADMIN_IDS=1234567890,9876543210 # Comma-separated user IDs (examples)URL Validation:
# Only http/https schemes allowed
pattern = r'^https?://[^\s]+'
# Prevent command injection
url = url.strip()File Upload Restrictions:
# Only .txt files allowed
MessageHandler(filters.Document.FileExtension("txt"), ...)Bot Token:
- Stored in
.env(should never be committed) - Recommend:
chmod 600 .env - Better: Use environment variable at runtime
Cookies File:
/path/to/project/data/cookies/cookies.txt
- Contains website authentication credentials
- Should have restricted permissions
- Path configurable via
COOKIES_FILEenvironment variable
Error Messages:
- Exceptions not exposed to users
- Generic "Download failed" message shown
- Details logged for debugging
No SQL Injection:
- All queries use parameterized statements
await cursor.execute("... WHERE id=?", (task_id,))
Database File Permissions:
/path/to/project/data/queue/autodl.db
- Should be readable/writable by bot user only
- Recommend:
chmod 600 autodl.db
TLS/HTTPS:
- All Telegram API communication via HTTPS
- yt-dlp respects certificates
Socket Timeout:
SOCKET_TIMEOUT=30 # Prevent hanging connectionsService File (/etc/systemd/system/autodl-bot.service):
[Unit]
Description=autodl_enhanced Telegram Download Bot
After=network.target
[Service]
Type=simple
User=autodl
WorkingDirectory=/opt/autodl_enhanced
ExecStart=/opt/autodl_enhanced/venv/bin/python -m src.autodl_bot
Restart=on-failure
RestartSec=10
[Install]
WantedBy=multi-user.targetService Management:
sudo systemctl start autodl-bot # Start service
sudo systemctl stop autodl-bot # Stop service
sudo systemctl status autodl-bot # Check status
sudo systemctl enable autodl-bot # Auto-start on boot
sudo journalctl -u autodl-bot -f # View live logsVirtual Environment:
/path/to/project/venv/Benefits:
- Isolated Python dependencies
- No conflicts with system Python
- Easy version management
autodl_enhanced implements a clean, production-ready architecture with:
- Modular Design: Clear separation of concerns (handlers, managers, utilities)
- Async Throughout: Leverages Python asyncio for efficient concurrency
- Persistent Queue: SQLite-based with atomic operations and proper indexing
- Intelligent Retry: Exponential backoff prevents server overload
- Smart Deduplication: Multi-level detection across 15+ platforms
- Resource Monitoring: Disk space, CPU, memory tracking
- Security: Admin-only access, input validation, sensitive data protection
- Production Ready: Systemd integration, rotating logs, error handling
- Extensible: Clean interfaces for adding new features
This architecture scales from low-end systems (2 workers) to high-end servers (16+ workers) without code changes.