Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions agentstack/_tools/py_sql/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
import psycopg2
from typing import Dict, Any

def get_connection():
Comment thread
bboynton97 marked this conversation as resolved.
Outdated
"""Get PostgreSQL database connection"""
return psycopg2.connect(
dbname=os.getenv('POSTGRES_DB'),
user=os.getenv('POSTGRES_USER'),
password=os.getenv('POSTGRES_PASSWORD'),
host=os.getenv('POSTGRES_HOST', 'localhost'),
port=os.getenv('POSTGRES_PORT', '5432')
)

def get_schema() -> Dict[str, Any]:
"""
Initialize connection and get database schema.
Returns a dictionary containing the database schema.
"""
try:
conn = get_connection()
cursor = conn.cursor()

# Query to get all tables in the current schema
schema_query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE';
"""

cursor.execute(schema_query)
tables = cursor.fetchall()

# Create schema dictionary
schema = {}
for (table_name,) in tables:
# Get column information for each table
column_query = """
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s;
"""
cursor.execute(column_query, (table_name,))
columns = [col[0] for col in cursor.fetchall()]
schema[table_name] = columns

cursor.close()
conn.close()
return schema

except Exception as e:
print(f"Error getting database schema: {str(e)}")
return {}

def execute_query(query: str) -> list:
"""
Execute a SQL query on the database.
Args:
query: SQL query to execute
Returns:
List of query results
"""
try:
conn = get_connection()
cursor = conn.cursor()

# Execute the query
cursor.execute(query)
results = cursor.fetchall()

cursor.close()
conn.close()
return results

except Exception as e:
print(f"Error executing query: {str(e)}")
return []
37 changes: 37 additions & 0 deletions agentstack/_tools/py_sql/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"name": "py_sql",
"url": "https://pypi.org/project/psycopg2/",
"category": "database",
"env": {
"POSTGRES_DB": {
Comment thread
bboynton97 marked this conversation as resolved.
Outdated
"description": "PostgreSQL database name",
"required": true
},
"POSTGRES_USER": {
"description": "PostgreSQL username",
"required": true
},
"POSTGRES_PASSWORD": {
"description": "PostgreSQL password",
"required": true
},
"POSTGRES_HOST": {
"description": "PostgreSQL host address",
"required": true,
"default": "localhost"
},
"POSTGRES_PORT": {
"description": "PostgreSQL port number",
"required": true,
"default": "5432"
}
},
"dependencies": [
"psycopg2-binary>=2.9.9"
],
"tools": [
"get_schema",
"execute_query"
],
"cta": "Set up your PostgreSQL connection variables in the environment file."
}
Binary file added agentstack/_tools/py_sql/test.db
Comment thread
bboynton97 marked this conversation as resolved.
Outdated
Binary file not shown.
66 changes: 66 additions & 0 deletions agentstack/_tools/py_sql/test.py
Comment thread
bboynton97 marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from sql import Table
from __init__ import construct_sql_query

def test_query_construction():
Comment thread
bboynton97 marked this conversation as resolved.
Outdated
"""Test query construction without a real database"""

# Define our test table structure
users = Table('users')

print("\n=== Testing SELECT queries ===")
# Test basic select
query, params = construct_sql_query(
"select",
"users",
columns=[users.name, users.age],
where=users.age > 18
)
print("Select users over 18:")
print(f"Query: {query}")
print(f"Params: {params}")

# Test select with multiple conditions
query, params = construct_sql_query(
"select",
"users",
columns=[users.name, users.email],
where=(users.age > 18) & (users.active == True)
)
print("\nSelect active users over 18:")
print(f"Query: {query}")
print(f"Params: {params}")

print("\n=== Testing INSERT queries ===")
query, params = construct_sql_query(
"insert",
"users",
values=[["John Doe", 25, "john@example.com", True]]
)
print("Insert new user:")
print(f"Query: {query}")
print(f"Params: {params}")

print("\n=== Testing UPDATE queries ===")
query, params = construct_sql_query(
"update",
"users",
columns=[users.active],
values=[False],
where=users.age < 18
)
print("Deactivate users under 18:")
print(f"Query: {query}")
print(f"Params: {params}")

print("\n=== Testing DELETE queries ===")
query, params = construct_sql_query(
"delete",
"users",
where=users.active == False
)
print("Delete inactive users:")
print(f"Query: {query}")
print(f"Params: {params}")

if __name__ == "__main__":
test_query_construction()