From aecc31cc2fe3cd4d74221b44242a28cd057c1220 Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 11 Feb 2026 11:19:11 +0100 Subject: [PATCH 1/3] Add first rendition of sqlite3.py sink --- aikido_zen/__init__.py | 1 + aikido_zen/sinks/sqlite3.py | 93 +++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 aikido_zen/sinks/sqlite3.py diff --git a/aikido_zen/__init__.py b/aikido_zen/__init__.py index c65ba3509..1739c4adb 100644 --- a/aikido_zen/__init__.py +++ b/aikido_zen/__init__.py @@ -72,6 +72,7 @@ def protect(mode="daemon", token=""): import aikido_zen.sinks.psycopg import aikido_zen.sinks.asyncpg import aikido_zen.sinks.clickhouse_driver + import aikido_zen.sinks.sqlite3 import aikido_zen.sinks.builtins import aikido_zen.sinks.os diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py new file mode 100644 index 000000000..e333b6103 --- /dev/null +++ b/aikido_zen/sinks/sqlite3.py @@ -0,0 +1,93 @@ +""" +Sink module for `sqlite3` +""" + +from aikido_zen.helpers.get_argument import get_argument +import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.register_call import register_call +from aikido_zen.sinks import patch_function, on_import, before + + +@before +def _cursor_execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Cursor.execute", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", op="sqlite3.Cursor.execute", args=(query, "sqlite") + ) + + +@before +def _cursor_executemany(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Cursor.executemany", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", op="sqlite3.Cursor.executemany", args=(query, "sqlite") + ) + + +@before +def _cursor_executescript(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql_script") + + register_call("sqlite3.Cursor.executescript", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", + op="sqlite3.Cursor.executescript", + args=(query, "sqlite"), + ) + + +@before +def _connection_execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Connection.execute", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", op="sqlite3.Connection.execute", args=(query, "sqlite") + ) + + +@before +def _connection_executemany(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Connection.executemany", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", + op="sqlite3.Connection.executemany", + args=(query, "sqlite"), + ) + + +@before +def _connection_executescript(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql_script") + + register_call("sqlite3.Connection.executescript", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", + op="sqlite3.Connection.executescript", + args=(query, "sqlite"), + ) + + +@on_import("sqlite3") +def patch(m): + """ + patching sqlite3 + - patches Cursor.execute(sql, ...) + - patches Cursor.executemany(sql, ...) + - patches Cursor.executescript(sql_script) + - patches Connection.execute(sql, ...) + - patches Connection.executemany(sql, ...) + - patches Connection.executescript(sql_script) + """ + patch_function(m, "Cursor.execute", _cursor_execute) + patch_function(m, "Cursor.executemany", _cursor_executemany) + patch_function(m, "Cursor.executescript", _cursor_executescript) + patch_function(m, "Connection.execute", _connection_execute) + patch_function(m, "Connection.executemany", _connection_executemany) + patch_function(m, "Connection.executescript", _connection_executescript) From 3d54bf008843cd5923bd479905216bd3be51e22d Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 11 Feb 2026 11:23:20 +0100 Subject: [PATCH 2/3] Add sqlite3 sink tests --- aikido_zen/sinks/tests/sqlite3_test.py | 239 +++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 aikido_zen/sinks/tests/sqlite3_test.py diff --git a/aikido_zen/sinks/tests/sqlite3_test.py b/aikido_zen/sinks/tests/sqlite3_test.py new file mode 100644 index 000000000..50fbc77f9 --- /dev/null +++ b/aikido_zen/sinks/tests/sqlite3_test.py @@ -0,0 +1,239 @@ +import pytest +from unittest.mock import patch +import aikido_zen.sinks.sqlite3 +from aikido_zen.background_process.comms import reset_comms + +kind = "sql_injection" + + +@pytest.fixture +def database_conn(): + import sqlite3 + + conn = sqlite3.connect(":memory:") + conn.execute( + "CREATE TABLE dogs (id INTEGER PRIMARY KEY, dog_name TEXT, isAdmin INTEGER)" + ) + conn.commit() + return conn + + +def test_cursor_execute(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + query = "SELECT * FROM dogs" + cursor.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + cursor.fetchall() + mock_run_vulnerability_scan.assert_called_once() + + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_execute_parameterized(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + query = "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + cursor.execute(query, ("doggo", 0)) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_execute_no_args(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + dogname = "Doggo" + isadmin = 1 + query = f"INSERT INTO dogs (dog_name, isAdmin) VALUES ('{dogname}', {isadmin})" + cursor.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] + == "INSERT INTO dogs (dog_name, isAdmin) VALUES ('Doggo', 1)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_execute_with_fstring(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + table_name = "dogs" + value_2 = "1" + cursor.execute( + f"INSERT INTO {table_name} (dog_name, isAdmin) VALUES (?, {value_2})", + ("doggy",), + ) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] == "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, 1)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_executemany(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + data = [ + ("Doggy", 0), + ("Doggy 2", 1), + ("Dogski", 1), + ] + cursor.executemany("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", data) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] == "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_executescript(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + script = """ + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Fido', 0); + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Rex', 1); + """ + cursor.executescript(script) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == script + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_execute(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + query = "SELECT * FROM dogs" + database_conn.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_execute_parameterized(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + query = "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + database_conn.execute(query, ("doggo", 0)) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_executemany(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + data = [ + ("Doggy", 0), + ("Doggy 2", 1), + ("Dogski", 1), + ] + database_conn.executemany( + "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", data + ) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] == "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_executescript(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + script = """ + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Fido', 0); + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Rex', 1); + """ + database_conn.executescript(script) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == script + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() From 92fe154310136362fe9938326739bc3c0436733f Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 11 Feb 2026 11:23:42 +0100 Subject: [PATCH 3/3] Update sqlite3.py sink: Add c-level wrapping, still needs some modifications --- aikido_zen/sinks/sqlite3.py | 119 ++++++++++++++++++++++++------------ 1 file changed, 81 insertions(+), 38 deletions(-) diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index e333b6103..7fb835073 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -1,11 +1,22 @@ """ Sink module for `sqlite3` + +sqlite3 uses C-level types for Connection and Cursor, so we cannot directly +patch their methods with wrapt. Instead we: +1. Intercept `sqlite3.connect` and inject a custom `factory` parameter. +2. The custom factory is a dynamic Python Connection subclass whose `cursor()` + returns a wrapped Cursor subclass. +3. All SQL interception happens at the Cursor level. Connection shortcut methods + (execute, executemany, executescript) internally call cursor methods, so + wrapping only the Cursor avoids double-counting. """ +import sqlite3 as _sqlite3 + from aikido_zen.helpers.get_argument import get_argument import aikido_zen.vulnerabilities as vulns from aikido_zen.helpers.register_call import register_call -from aikido_zen.sinks import patch_function, on_import, before +from aikido_zen.sinks import patch_function, on_import, before, before_modify_return @before @@ -24,7 +35,9 @@ def _cursor_executemany(func, instance, args, kwargs): register_call("sqlite3.Cursor.executemany", "sql_op") vulns.run_vulnerability_scan( - kind="sql_injection", op="sqlite3.Cursor.executemany", args=(query, "sqlite") + kind="sql_injection", + op="sqlite3.Cursor.executemany", + args=(query, "sqlite"), ) @@ -40,54 +53,84 @@ def _cursor_executescript(func, instance, args, kwargs): ) -@before -def _connection_execute(func, instance, args, kwargs): - query = get_argument(args, kwargs, 0, "sql") - - register_call("sqlite3.Connection.execute", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", op="sqlite3.Connection.execute", args=(query, "sqlite") +def _build_aikido_cursor(base_cursor_cls): + """ + Creates a Python-level subclass of the given Cursor class with mutable + method slots so that wrapt can patch them. + """ + cls = type( + "AikidoSQLite3Cursor", + (base_cursor_cls,), + { + "execute": base_cursor_cls.execute, + "executemany": base_cursor_cls.executemany, + "executescript": base_cursor_cls.executescript, + }, ) + patch_function(cls, "execute", _cursor_execute) + patch_function(cls, "executemany", _cursor_executemany) + patch_function(cls, "executescript", _cursor_executescript) + return cls -@before -def _connection_executemany(func, instance, args, kwargs): - query = get_argument(args, kwargs, 0, "sql") +_AikidoCursor = _build_aikido_cursor(_sqlite3.Cursor) - register_call("sqlite3.Connection.executemany", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", - op="sqlite3.Connection.executemany", - args=(query, "sqlite"), - ) +def _aikido_cursor(self, *args, **kwargs): + """Replacement cursor() that returns an AikidoSQLite3Cursor instance.""" + return _AikidoCursor(self) -@before -def _connection_executescript(func, instance, args, kwargs): - query = get_argument(args, kwargs, 0, "sql_script") - register_call("sqlite3.Connection.executescript", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", - op="sqlite3.Connection.executescript", - args=(query, "sqlite"), +def _build_aikido_connection(base_conn_cls): + """ + Creates a Python-level Connection subclass whose cursor() returns + wrapped cursors. + """ + return type( + "AikidoSQLite3Connection", + (base_conn_cls,), + { + "cursor": _aikido_cursor, + }, ) +_AikidoConnection = _build_aikido_connection(_sqlite3.Connection) + + +@before_modify_return +def _connect(func, instance, args, kwargs): + """ + Intercept sqlite3.connect to inject our Connection factory. + The factory parameter is the 6th positional arg (index 5) or a keyword arg. + """ + # Determine the user-specified factory, if any + factory = kwargs.get("factory") + if factory is None and len(args) > 5: + factory = args[5] + if factory is None: + factory = _sqlite3.Connection + + # If the user passed a custom factory, build a new wrapped subclass for it + if factory is _sqlite3.Connection: + aikido_factory = _AikidoConnection + else: + aikido_factory = _build_aikido_connection(factory) + + # Build new args with our factory injected as a keyword + new_args = args[:5] if len(args) > 5 else args + new_kwargs = dict(kwargs) + new_kwargs["factory"] = aikido_factory + + return func(*new_args, **new_kwargs) + + @on_import("sqlite3") def patch(m): """ patching sqlite3 - - patches Cursor.execute(sql, ...) - - patches Cursor.executemany(sql, ...) - - patches Cursor.executescript(sql_script) - - patches Connection.execute(sql, ...) - - patches Connection.executemany(sql, ...) - - patches Connection.executescript(sql_script) + - patches sqlite3.connect to inject a wrapped Connection factory + - wrapped connections produce wrapped cursors + - Cursor.execute, Cursor.executemany, Cursor.executescript are intercepted """ - patch_function(m, "Cursor.execute", _cursor_execute) - patch_function(m, "Cursor.executemany", _cursor_executemany) - patch_function(m, "Cursor.executescript", _cursor_executescript) - patch_function(m, "Connection.execute", _connection_execute) - patch_function(m, "Connection.executemany", _connection_executemany) - patch_function(m, "Connection.executescript", _connection_executescript) + patch_function(m, "connect", _connect)