Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 153 additions & 58 deletions app/controllers/solution.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,79 @@
"""
Solution controller - Solution uploading.
Solution controller - Solution uploading and viewing.
"""

import os
from html import escape as html_escape
from flask import Blueprint, render_template, request, redirect, flash, session, abort
from flask import Blueprint, render_template, request, redirect, flash, session, abort, current_app
from werkzeug.utils import secure_filename
import bleach
from app.models.crackme import crackme_by_hexid
from app.models.solution import solution_create, solutions_by_user_and_crackme
from app.models.solution import solution_create, solution_exists, solution_by_hexid
from app.models.notification import notification_add
from app.models.errors import ErrNoResult
from app.services.recaptcha import verify as verify_recaptcha
from app.services.limiter import limit
from app.services.view import FLASH_ERROR, FLASH_SUCCESS
from app.services.view import FLASH_ERROR, is_valid_hexid
from app.services.archive import is_archive_password_protected, is_pe_file
from app.services.discord import notify_new_solution
from app.services.crypto import get_obfuscation_key_base64, get_obfuscation_salt
from app.controllers.decorators import login_required

solution_bp = Blueprint('solution', __name__)

# Upload folder for solutions
UPLOAD_FOLDER = 'tmp/solution'
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
MAX_INFO_LENGTH = 200
MAX_CONTENT_LENGTH = 50000
MIN_CONTENT_LENGTH = 200
MARKDOWN_EXTENSIONS = frozenset({'.md', '.txt', '.markdown'})


@solution_bp.route('/upload/solution/<hexidcrackme>', methods=['GET'])
@login_required
def upload_solution_get(hexidcrackme):
"""Display the solution upload form."""
def _get_crackme_or_abort(hexid):
"""Fetch crackme by hexid, abort with 404/500 on error."""
if not is_valid_hexid(hexid):
abort(404)
try:
crackme = crackme_by_hexid(hexidcrackme)
return crackme_by_hexid(hexid)
except ErrNoResult:
abort(404)
except Exception as e:
print(f"Error getting crackme: {e}")
abort(500)


def _get_solution_or_abort(hexid):
"""Fetch solution by hexid, abort with 404/500 on error."""
if not is_valid_hexid(hexid):
abort(404)
try:
return solution_by_hexid(hexid)
except ErrNoResult:
abort(404)
except Exception as e:
print(f"Error getting solution: {e}")
abort(500)


def _send_notifications_and_render_success(username, crackme):
"""Send notifications and render the success page after solution creation."""
try:
notification_add(username, f"Your solution for '{html_escape(crackme['name'])}' is waiting approval!")
notify_new_solution(username, crackme['name'])
except Exception as e:
print(f"Notification error: {e}")

return render_template('submission/success.html',
submission_type='Writeup',
name=crackme['name'],
username=username)


@solution_bp.route('/upload/solution/<hexidcrackme>', methods=['GET'])
@login_required
def upload_solution_get(hexidcrackme):
"""Display the solution upload form."""
crackme = _get_crackme_or_abort(hexidcrackme)
return render_template('solution/create.html',
hexidcrackme=hexidcrackme,
username=crackme.get('author', ''),
Expand All @@ -47,95 +84,153 @@ def upload_solution_get(hexidcrackme):
@login_required
@limit("20 per day", key_func=lambda: session.get('name'))
def upload_solution_post(hexidcrackme):
"""Handle solution upload."""
"""Handle solution file upload."""
crackme = _get_crackme_or_abort(hexidcrackme)
username = session.get('name')

info = bleach.clean(request.form.get('info', ''))
redirect_url = f'/upload/solution/{hexidcrackme}'

# Check if user already submitted a solution
try:
solutions_by_user_and_crackme(username, hexidcrackme)
if solution_exists(username, crackme['_id']):
flash("You've already submitted a solution to this crackme", FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
except ErrNoResult:
pass # No existing solution, continue
return redirect(redirect_url)

# Validate reCAPTCHA
if not verify_recaptcha(request):
flash('reCAPTCHA invalid!', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
return redirect(redirect_url)

# Check for file
if 'file' not in request.files:
# Validate file presence
if 'file' not in request.files or request.files['file'].filename == '':
flash('Field missing: file', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
return redirect(redirect_url)

file = request.files['file']
if file.filename == '':
flash('Field missing: file', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')

# Check file size from header
# Check file size (header and actual)
if file.content_length and file.content_length > MAX_FILE_SIZE:
flash('This file is too large!', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
return redirect(redirect_url)

# Read file data
try:
data = file.read()
if len(data) > MAX_FILE_SIZE:
flash('This file is too large!', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
except Exception as e:
print(f"Error reading file: {e}")
abort(500)

if len(data) > MAX_FILE_SIZE:
flash('This file is too large!', FLASH_ERROR)
return redirect(redirect_url)

# Check for password-protected archives
if is_archive_password_protected(data):
flash('Password-protected archives are not allowed. Do NOT add a password yourself - the server handles this automatically.', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
return redirect(redirect_url)

# Check for PE files (patched binaries are not allowed)
# Check for PE files (patched binaries not allowed)
if is_pe_file(file.filename, data):
flash('Executable files (PE binaries) are not allowed as solutions. '
'Please submit a writeup that analyzes the algorithm instead of a patched binary.', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
flash('Executable files (PE binaries) are not allowed as solutions. Please submit a writeup that analyzes the algorithm instead of a patched binary.', FLASH_ERROR)
return redirect(redirect_url)

info = bleach.clean(request.form.get('info', ''))
if len(info) > MAX_INFO_LENGTH:
flash(f'Info field exceeds maximum length of {MAX_INFO_LENGTH} characters.', FLASH_ERROR)
return redirect(redirect_url)

# Secure filename (fallback to "unnamed" if filename has only unsafe characters)
original_filename = secure_filename(file.filename) or "unnamed"
ext = os.path.splitext(original_filename)[1].lower()
has_markdown = ext in MARKDOWN_EXTENSIONS

# Create solution
try:
solution = solution_create(info, username, hexidcrackme, original_filename)
solution = solution_create(info, username, crackme, original_filename, has_markdown)
except Exception as e:
print(f"Error creating solution: {e}")
abort(500)

# Create path using hexid only
safe_path = os.path.join(UPLOAD_FOLDER, solution['hexid'])

# Ensure upload directory exists
# Save uploaded file
os.makedirs(UPLOAD_FOLDER, exist_ok=True)

# Save file
try:
with open(safe_path, 'wb') as f:
with open(os.path.join(UPLOAD_FOLDER, solution['hexid']), 'wb') as f:
f.write(data)
except Exception as e:
print(f"File write error: {e}")
flash('An error occurred on the server. Please try again later.', FLASH_ERROR)
return redirect(f'/upload/solution/{hexidcrackme}')
return redirect(redirect_url)

return _send_notifications_and_render_success(username, crackme)


@solution_bp.route('/solution/<hexid>', methods=['GET'])
@login_required
def view_solution(hexid):
"""Display a solution's writeup page."""
solution = _get_solution_or_abort(hexid)
salt = get_obfuscation_salt(current_app.config)

return render_template('solution/read.html',
solution=solution,
obfuscation_key=get_obfuscation_key_base64(hexid, salt))


@solution_bp.route('/upload/solution/<hexidcrackme>/editor', methods=['GET'])
@login_required
def editor_solution_get(hexidcrackme):
"""Display the web-based markdown editor for writing solutions."""
crackme = _get_crackme_or_abort(hexidcrackme)
return render_template('solution/editor.html',
hexidcrackme=hexidcrackme,
username=crackme.get('author', ''),
crackmename=crackme.get('name', ''),
min_content_length=MIN_CONTENT_LENGTH,
max_content_length=MAX_CONTENT_LENGTH)


@solution_bp.route('/upload/solution/<hexidcrackme>/editor', methods=['POST'])
@login_required
@limit("20 per day", key_func=lambda: session.get('name'))
def editor_solution_post(hexidcrackme):
"""Handle solution submission from the web editor."""
crackme = _get_crackme_or_abort(hexidcrackme)
username = session.get('name')
redirect_url = f'/upload/solution/{hexidcrackme}/editor'

# Check if user already submitted a solution
if solution_exists(username, crackme['_id']):
flash("You've already submitted a solution to this crackme", FLASH_ERROR)
return redirect(redirect_url)

if not verify_recaptcha(request):
flash('reCAPTCHA invalid!', FLASH_ERROR)
return redirect(redirect_url)

content = request.form.get('content', '').strip()

# Validate content length
if len(content) < MIN_CONTENT_LENGTH:
flash(f'Your writeup is too short. Please write at least {MIN_CONTENT_LENGTH} characters.', FLASH_ERROR)
return redirect(redirect_url)

if len(content) > MAX_CONTENT_LENGTH:
flash(f'Your writeup exceeds the maximum length of {MAX_CONTENT_LENGTH:,} characters.', FLASH_ERROR)
return redirect(redirect_url)

info = bleach.clean(request.form.get('info', ''))
if len(info) > MAX_INFO_LENGTH:
flash(f'Info field exceeds maximum length of {MAX_INFO_LENGTH} characters.', FLASH_ERROR)
return redirect(redirect_url)

# Send notification
try:
crackme = crackme_by_hexid(hexidcrackme)
notification_add(username, f"Your solution for '{html_escape(crackme['name'])}' is waiting approval!")
# Send Discord notification
notify_new_solution(username, crackme['name'])
solution = solution_create(info, username, crackme, 'writeup.md', has_markdown=True)
except Exception as e:
print(f"Notification error: {e}")
print(f"Error creating solution: {e}")
abort(500)

return render_template('submission/success.html',
submission_type='Writeup',
name=crackme['name'],
username=username)
# Save markdown content
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
try:
with open(os.path.join(UPLOAD_FOLDER, solution['hexid']), 'w', encoding='utf-8') as f:
f.write(content)
except Exception as e:
print(f"File write error: {e}")
flash('An error occurred on the server. Please try again later.', FLASH_ERROR)
return redirect(redirect_url)

return _send_notifications_and_render_success(username, crackme)
45 changes: 19 additions & 26 deletions app/models/solution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Solution model for database operations.
"""

from datetime import datetime
from datetime import datetime, timezone
from bson import ObjectId
from pymongo import DESCENDING
from app.services.database import get_collection, check_connection
Expand Down Expand Up @@ -72,28 +72,24 @@ def solutions_by_user(username):
return solutions


def solutions_by_user_and_crackme(username, crackme_hexid):
"""Get solution by user and crackme."""
def solution_exists(username, crackme_id):
"""Check if a user has already submitted a solution for a crackme.

Args:
username: The username to check
crackme_id: The crackme's ObjectId (not hexid)

Returns:
True if a solution exists, False otherwise
"""
if not check_connection():
raise ErrUnavailable("Database is unavailable")

# First get the crackme
from app.models.crackme import crackme_by_hexid
try:
crackme = crackme_by_hexid(crackme_hexid)
except ErrNoResult:
raise ErrNoResult("Solution not found")

collection = get_collection('solution')
result = collection.find_one({
'crackmeid': crackme['_id'],
'author': username
})

if result is None:
raise ErrNoResult("Solution not found")

return result
return collection.find_one(
{'crackmeid': crackme_id, 'author': username},
{'_id': 1}
) is not None


def solutions_by_crackme(crackme_object_id):
Expand Down Expand Up @@ -129,15 +125,11 @@ def get_solution_authors(crackme_hexid):
return set(solution['author'] for solution in solutions)


def solution_create(info, username, crackme_hexid, original_filename):
def solution_create(info, username, crackme, original_filename=None, has_markdown=False):
"""Create a new solution."""
if not check_connection():
raise ErrUnavailable("Database is unavailable")

# Get the crackme
from app.models.crackme import crackme_by_hexid
crackme = crackme_by_hexid(crackme_hexid)

collection = get_collection('solution')
obj_id = ObjectId()

Expand All @@ -148,11 +140,12 @@ def solution_create(info, username, crackme_hexid, original_filename):
'crackmeid': crackme['_id'],
'crackmehexid': crackme['hexid'],
'crackmename': crackme['name'],
'created_at': datetime.utcnow(),
'created_at': datetime.now(timezone.utc),
'author': username,
'visible': False,
'deleted': False,
'original_filename': original_filename
'original_filename': original_filename,
'has_markdown': has_markdown
}

collection.insert_one(solution)
Expand Down
2 changes: 1 addition & 1 deletion app/services/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import struct

# PE file extensions (case-insensitive)
PE_EXTENSIONS = {'.exe', '.dll', '.sys', '.scr', '.ocx', '.com', '.drv', '.cpl', '.efi'}
PE_EXTENSIONS = frozenset({'.exe', '.dll', '.sys', '.scr', '.ocx', '.com', '.drv', '.cpl', '.efi'})


def is_pe_file(filename: str, file_data: bytes) -> bool:
Expand Down
Loading