From e2ee8e1825b1a8aa3d4aae6af742b0eca69d38dd Mon Sep 17 00:00:00 2001 From: "Friedman, Colin [USA]" Date: Tue, 24 Mar 2026 21:27:42 -0400 Subject: [PATCH] Add vulnerable Python files for CodeQL security testing --- vulnerable_command_injection.py | 40 ++++++++++++++++++++ vulnerable_deserialization.py | 56 +++++++++++++++++++++++++++ vulnerable_hardcoded_secrets.py | 47 +++++++++++++++++++++++ vulnerable_missing_auth.py | 58 ++++++++++++++++++++++++++++ vulnerable_path_traversal.py | 54 ++++++++++++++++++++++++++ vulnerable_sql_injection.py | 52 +++++++++++++++++++++++++ vulnerable_ssrf.py | 58 ++++++++++++++++++++++++++++ vulnerable_weak_crypto.py | 49 ++++++++++++++++++++++++ vulnerable_xss.py | 67 +++++++++++++++++++++++++++++++++ vulnerable_xxe.py | 55 +++++++++++++++++++++++++++ 10 files changed, 536 insertions(+) create mode 100644 vulnerable_command_injection.py create mode 100644 vulnerable_deserialization.py create mode 100644 vulnerable_hardcoded_secrets.py create mode 100644 vulnerable_missing_auth.py create mode 100644 vulnerable_path_traversal.py create mode 100644 vulnerable_sql_injection.py create mode 100644 vulnerable_ssrf.py create mode 100644 vulnerable_weak_crypto.py create mode 100644 vulnerable_xss.py create mode 100644 vulnerable_xxe.py diff --git a/vulnerable_command_injection.py b/vulnerable_command_injection.py new file mode 100644 index 0000000..28e310b --- /dev/null +++ b/vulnerable_command_injection.py @@ -0,0 +1,40 @@ +import os +import subprocess +from flask import Flask, request + +app = Flask(__name__) + +@app.route('/ping') +def ping(): + host = request.args.get('host', 'localhost') + + command = "ping -c 4 " + host + result = os.system(command) + + return f"Ping result: {result}" + +@app.route('/execute') +def execute_command(): + filename = request.args.get('file', '') + + os.system(f"cat {filename}") + + return "Command executed" + +@app.route('/backup') +def backup(): + backup_path = request.form.get('path', '/default/backup') + + subprocess.call("tar -czf backup.tar.gz " + backup_path, shell=True) + + return "Backup completed" + +def process_file(user_input): + cmd = f"grep 'pattern' {user_input}" + subprocess.run(cmd, shell=True, capture_output=True) + +def convert_file(input_file): + os.popen(f"convert {input_file} output.pdf").read() + +if __name__ == '__main__': + app.run(debug=True) diff --git a/vulnerable_deserialization.py b/vulnerable_deserialization.py new file mode 100644 index 0000000..4d443d1 --- /dev/null +++ b/vulnerable_deserialization.py @@ -0,0 +1,56 @@ +import pickle +import yaml +import marshal +from flask import Flask, request + +app = Flask(__name__) + +@app.route('/load', methods=['POST']) +def load_data(): + data = request.data + + obj = pickle.loads(data) + + return str(obj) + +@app.route('/session', methods=['POST']) +def restore_session(): + session_data = request.form.get('session') + + session = pickle.loads(session_data.encode()) + + return f"Session restored: {session}" + +def load_config(config_data): + config = yaml.load(config_data) + return config + +def deserialize_object(serialized): + return pickle.loads(serialized) + +def load_user_preferences(pref_string): + prefs = marshal.loads(pref_string) + return prefs + +@app.route('/import', methods=['POST']) +def import_data(): + import_file = request.files['file'] + content = import_file.read() + + data = pickle.loads(content) + + return f"Imported: {data}" + +def process_yaml(yaml_content): + parsed = yaml.load(yaml_content, Loader=yaml.Loader) + return parsed + +class DataProcessor: + def load_from_bytes(self, byte_data): + return pickle.loads(byte_data) + + def restore_state(self, state_data): + self.__dict__ = pickle.loads(state_data) + +if __name__ == '__main__': + app.run(debug=True) diff --git a/vulnerable_hardcoded_secrets.py b/vulnerable_hardcoded_secrets.py new file mode 100644 index 0000000..b0326a6 --- /dev/null +++ b/vulnerable_hardcoded_secrets.py @@ -0,0 +1,47 @@ +import requests +import boto3 +from sqlalchemy import create_engine + +API_KEY = "sk-1234567890abcdefghijklmnopqrstuvwxyz" +SECRET_TOKEN = "ghp_1234567890abcdefghijklmnopqrstuvwxyz" +DATABASE_PASSWORD = "SuperSecret123!" + +AWS_ACCESS_KEY = "AKIAIOSFODNN7EXAMPLE" +AWS_SECRET_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + +def connect_to_database(): + connection_string = "postgresql://admin:P@ssw0rd123@localhost:5432/mydb" + engine = create_engine(connection_string) + return engine + +def send_api_request(): + headers = { + 'Authorization': 'Bearer sk-prod-abc123xyz789secretkey', + 'X-API-Key': 'AIzaSyD1234567890abcdefghijklmnopqrs' + } + + response = requests.get('https://api.example.com/data', headers=headers) + return response.json() + +def connect_aws(): + client = boto3.client( + 's3', + aws_access_key_id='AKIAI44QH8DHBEXAMPLE', + aws_secret_access_key='je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY' + ) + return client + +class DatabaseConfig: + def __init__(self): + self.host = "database.example.com" + self.user = "admin" + self.password = "admin123" + self.port = 5432 + +SMTP_PASSWORD = "email_password_123" +JWT_SECRET = "my-secret-jwt-key-12345" + +def get_stripe_key(): + return "sk_test_fakekeyfordemopurposes123456789" + +SLACK_WEBHOOK = "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXX" diff --git a/vulnerable_missing_auth.py b/vulnerable_missing_auth.py new file mode 100644 index 0000000..bafbb28 --- /dev/null +++ b/vulnerable_missing_auth.py @@ -0,0 +1,58 @@ +from flask import Flask, request, jsonify + +app = Flask(__name__) + +@app.route('/admin/users') +def get_all_users(): + users = [ + {'id': 1, 'username': 'admin', 'email': 'admin@example.com'}, + {'id': 2, 'username': 'user1', 'email': 'user1@example.com'} + ] + return jsonify(users) + +@app.route('/admin/delete_user/', methods=['DELETE']) +def delete_user(user_id): + return jsonify({'message': f'User {user_id} deleted'}) + +@app.route('/api/sensitive_data') +def get_sensitive_data(): + data = { + 'ssn': '123-45-6789', + 'credit_card': '4532-1234-5678-9010', + 'salary': 150000 + } + return jsonify(data) + +@app.route('/config') +def get_config(): + config = { + 'database_url': 'postgresql://admin:password@localhost/db', + 'api_keys': ['key1', 'key2', 'key3'], + 'secret_key': 'my-secret-key' + } + return jsonify(config) + +@app.route('/admin/settings', methods=['POST']) +def update_settings(): + new_settings = request.json + return jsonify({'message': 'Settings updated', 'settings': new_settings}) + +@app.route('/user//financial') +def get_financial_info(user_id): + financial_data = { + 'account_balance': 50000, + 'transactions': ['payment1', 'payment2'] + } + return jsonify(financial_data) + +@app.route('/debug/logs') +def get_debug_logs(): + logs = [ + 'User admin logged in from 192.168.1.1', + 'Password reset for user@example.com', + 'API key abc123 used' + ] + return jsonify(logs) + +if __name__ == '__main__': + app.run(debug=True) diff --git a/vulnerable_path_traversal.py b/vulnerable_path_traversal.py new file mode 100644 index 0000000..8f83550 --- /dev/null +++ b/vulnerable_path_traversal.py @@ -0,0 +1,54 @@ +import os +from flask import Flask, request, send_file + +app = Flask(__name__) + +@app.route('/download') +def download_file(): + filename = request.args.get('file') + + file_path = os.path.join('/var/www/uploads/', filename) + return send_file(file_path) + +@app.route('/read') +def read_file(): + file_name = request.args.get('filename', 'default.txt') + + with open(file_name, 'r') as f: + content = f.read() + + return content + +@app.route('/view') +def view_document(): + doc = request.args.get('doc') + path = f"/documents/{doc}" + + with open(path) as file: + return file.read() + +def load_template(template_name): + template_path = "../templates/" + template_name + + with open(template_path, 'r') as f: + return f.read() + +def get_user_file(user_id, filename): + base_path = "/home/users/" + full_path = base_path + user_id + "/" + filename + + if os.path.exists(full_path): + with open(full_path, 'rb') as f: + return f.read() + + return None + +@app.route('/image') +def serve_image(): + img_name = request.args.get('name') + img_path = "./static/images/" + img_name + + return send_file(img_path) + +if __name__ == '__main__': + app.run(debug=True) diff --git a/vulnerable_sql_injection.py b/vulnerable_sql_injection.py new file mode 100644 index 0000000..4fde29f --- /dev/null +++ b/vulnerable_sql_injection.py @@ -0,0 +1,52 @@ +import sqlite3 +from flask import Flask, request + +app = Flask(__name__) + +@app.route('/login', methods=['POST']) +def login(): + username = request.form['username'] + password = request.form['password'] + + conn = sqlite3.connect('users.db') + cursor = conn.cursor() + + query = "SELECT * FROM users WHERE username='" + username + "' AND password='" + password + "'" + cursor.execute(query) + + user = cursor.fetchone() + conn.close() + + if user: + return "Login successful" + else: + return "Login failed" + +@app.route('/search') +def search(): + search_term = request.args.get('q', '') + + conn = sqlite3.connect('products.db') + cursor = conn.cursor() + + query = f"SELECT * FROM products WHERE name LIKE '%{search_term}%'" + cursor.execute(query) + + results = cursor.fetchall() + conn.close() + + return str(results) + +def get_user_info(user_id): + conn = sqlite3.connect('users.db') + cursor = conn.cursor() + + cursor.execute("SELECT * FROM users WHERE id = " + str(user_id)) + + user_data = cursor.fetchone() + conn.close() + + return user_data + +if __name__ == '__main__': + app.run(debug=True) diff --git a/vulnerable_ssrf.py b/vulnerable_ssrf.py new file mode 100644 index 0000000..f031b5e --- /dev/null +++ b/vulnerable_ssrf.py @@ -0,0 +1,58 @@ +import requests +from flask import Flask, request +import urllib.request + +app = Flask(__name__) + +@app.route('/fetch') +def fetch_url(): + url = request.args.get('url') + + response = requests.get(url) + + return response.text + +@app.route('/proxy') +def proxy_request(): + target_url = request.args.get('target') + + data = urllib.request.urlopen(target_url).read() + + return data + +@app.route('/webhook', methods=['POST']) +def webhook(): + callback_url = request.json.get('callback_url') + + response = requests.post(callback_url, json={'status': 'success'}) + + return f"Webhook sent: {response.status_code}" + +@app.route('/image') +def load_image(): + image_url = request.args.get('url') + + img_data = requests.get(image_url).content + + return img_data + +def fetch_remote_resource(resource_url): + with urllib.request.urlopen(resource_url) as response: + return response.read() + +@app.route('/metadata') +def fetch_metadata(): + metadata_url = request.args.get('metadata_url') + + metadata = requests.get(metadata_url, timeout=5).json() + + return metadata + +def download_file(file_url): + response = requests.get(file_url, stream=True) + with open('downloaded_file', 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + +if __name__ == '__main__': + app.run(debug=True) diff --git a/vulnerable_weak_crypto.py b/vulnerable_weak_crypto.py new file mode 100644 index 0000000..5b8aca6 --- /dev/null +++ b/vulnerable_weak_crypto.py @@ -0,0 +1,49 @@ +import hashlib +import random +from Crypto.Cipher import DES, ARC2, Blowfish +from Crypto.Hash import MD5, SHA1 + +def hash_password_weak(password): + return hashlib.md5(password.encode()).hexdigest() + +def hash_with_sha1(data): + return hashlib.sha1(data.encode()).hexdigest() + +def encrypt_data_des(data, key): + cipher = DES.new(key, DES.MODE_ECB) + encrypted = cipher.encrypt(data) + return encrypted + +def encrypt_with_arc2(plaintext, key): + cipher = ARC2.new(key, ARC2.MODE_ECB) + return cipher.encrypt(plaintext) + +def generate_token(): + token = "" + for i in range(10): + token += str(random.randint(0, 9)) + return token + +def create_session_id(): + import time + return hashlib.md5(str(time.time()).encode()).hexdigest() + +def weak_random_key(): + random.seed(12345) + return random.randint(1000, 9999) + +class PasswordHasher: + def hash(self, password): + return MD5.new(password.encode()).hexdigest() + +def verify_password(input_password, stored_hash): + input_hash = hashlib.md5(input_password.encode()).hexdigest() + return input_hash == stored_hash + +def encrypt_sensitive_data(data): + key = b'weakkey1' + cipher = DES.new(key, DES.MODE_ECB) + return cipher.encrypt(data.encode().ljust(8)) + +def generate_otp(): + return str(random.randrange(100000, 999999)) diff --git a/vulnerable_xss.py b/vulnerable_xss.py new file mode 100644 index 0000000..59b8652 --- /dev/null +++ b/vulnerable_xss.py @@ -0,0 +1,67 @@ +from flask import Flask, request, render_template_string, make_response + +app = Flask(__name__) + +@app.route('/hello') +def hello(): + name = request.args.get('name', 'Guest') + + return f"

Hello, {name}!

" + +@app.route('/comment', methods=['POST']) +def post_comment(): + comment = request.form.get('comment', '') + + html = f""" + + +

Your comment:

+

{comment}

+ + + """ + + return html + +@app.route('/search') +def search(): + query = request.args.get('q', '') + + template = f"

Search results for: {query}

" + return render_template_string(template) + +@app.route('/profile') +def profile(): + username = request.args.get('user', 'Anonymous') + bio = request.args.get('bio', '') + + page = f""" + + User Profile + +

{username}

+
{bio}
+ + + """ + + return page + +@app.route('/error') +def error_page(): + error_msg = request.args.get('msg') + + return f"
{error_msg}
" + +@app.route('/dashboard') +def dashboard(): + user_input = request.args.get('data', '') + + response = make_response(f"

Dashboard data: {user_input}

") + return response + +def render_user_content(content): + return f"
{content}
" + +if __name__ == '__main__': + app.run(debug=True) diff --git a/vulnerable_xxe.py b/vulnerable_xxe.py new file mode 100644 index 0000000..b9fc887 --- /dev/null +++ b/vulnerable_xxe.py @@ -0,0 +1,55 @@ +import xml.etree.ElementTree as ET +from flask import Flask, request +from lxml import etree +import xml.sax + +app = Flask(__name__) + +@app.route('/parse_xml', methods=['POST']) +def parse_xml(): + xml_data = request.data + + tree = ET.fromstring(xml_data) + + return f"Parsed: {tree.tag}" + +@app.route('/process_xml', methods=['POST']) +def process_xml(): + xml_content = request.data.decode() + + parser = etree.XMLParser() + doc = etree.fromstring(xml_content.encode(), parser) + + return etree.tostring(doc).decode() + +def parse_xml_file(filename): + tree = ET.parse(filename) + root = tree.getroot() + return root + +def parse_user_xml(xml_string): + parser = etree.XMLParser(resolve_entities=True) + root = etree.fromstring(xml_string.encode(), parser) + return root + +class XMLHandler(xml.sax.ContentHandler): + def startElement(self, name, attrs): + print(f"Element: {name}") + +def process_xml_sax(xml_data): + parser = xml.sax.make_parser() + handler = XMLHandler() + parser.setContentHandler(handler) + parser.parse(xml_data) + +@app.route('/upload_xml', methods=['POST']) +def upload_xml(): + xml_file = request.files['file'] + content = xml_file.read() + + root = ET.fromstring(content) + + return f"Uploaded: {root.tag}" + +if __name__ == '__main__': + app.run(debug=True)