-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathvault.py
More file actions
196 lines (159 loc) · 5.14 KB
/
vault.py
File metadata and controls
196 lines (159 loc) · 5.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import uuid
from db import get_conn
from crypto import encrypt, decrypt
from validation import validate_service, validate_username, validate_password, validate_entry_id, ValidationError
def create_entry(service, username, secret, key):
"""
Create a new vault entry with validation.
Args:
service: Service name (validated)
username: Username (validated)
secret: Password/secret (validated)
key: Encryption key
Raises:
ValidationError: If validation fails
RuntimeError: If database operation fails
"""
# Validate inputs
service = validate_service(service)
username = validate_username(username)
secret = validate_password(secret)
encrypted = encrypt(secret, key)
try:
with get_conn() as conn:
conn.execute(
"""
INSERT INTO vault (id, service, username, secret)
VALUES (%s, %s, %s, %s)
""",
(uuid.uuid4(), service, username, encrypted),
)
conn.commit()
except Exception as e:
raise RuntimeError(f"Failed to create entry: {str(e)}")
def list_entries(key):
with get_conn() as conn:
rows = conn.execute(
"SELECT id, service, username, secret FROM vault"
).fetchall()
results = []
for r in rows:
results.append({
"id": r[0],
"service": r[1],
"username": r[2],
"secret": decrypt(r[3], key),
})
return results
def delete_entry(entry_id):
"""
Delete a vault entry by ID.
Args:
entry_id: Entry ID (validated UUID)
Raises:
ValidationError: If entry ID is invalid
RuntimeError: If entry not found or deletion fails
"""
entry_uuid = validate_entry_id(entry_id)
try:
with get_conn() as conn:
result = conn.execute("DELETE FROM vault WHERE id = %s", (entry_uuid,))
conn.commit()
if result.rowcount == 0:
raise RuntimeError("ENTRY_NOT_FOUND")
except RuntimeError:
raise
except Exception as e:
raise RuntimeError(f"Failed to delete entry: {str(e)}")
def list_entries_metadata():
with get_conn() as conn:
rows = conn.execute(
"SELECT id, service, username FROM vault"
).fetchall()
return [
{"id": r[0], "service": r[1], "username": r[2]}
for r in rows
]
def entry_exists(entry_id: str) -> bool:
"""
Check if an entry exists in the vault.
Args:
entry_id: Entry ID to check
Returns:
True if entry exists, False otherwise
"""
try:
entry_uuid = validate_entry_id(entry_id)
with get_conn() as conn:
row = conn.execute(
"SELECT 1 FROM vault WHERE id = %s",
(entry_uuid,),
).fetchone()
return row is not None
except (ValidationError, RuntimeError):
return False
def get_entry_secret(entry_id: str, key: bytes) -> str:
"""
Get the decrypted secret for an entry.
Args:
entry_id: Entry ID (validated UUID)
key: Decryption key
Returns:
Decrypted secret string
Raises:
ValidationError: If entry ID is invalid
RuntimeError: If entry not found or decryption fails
"""
entry_uuid = validate_entry_id(entry_id)
try:
with get_conn() as conn:
row = conn.execute(
"SELECT secret FROM vault WHERE id = %s",
(entry_uuid,),
).fetchone()
if not row:
raise RuntimeError("ENTRY_NOT_FOUND")
return decrypt(row[0], key)
except RuntimeError:
raise
except Exception as e:
raise RuntimeError(f"Failed to retrieve entry: {str(e)}")
def update_entry(entry_id, service, username, secret, key):
"""
Update a vault entry with validation.
Args:
entry_id: Entry ID (validated UUID)
service: Service name (validated)
username: Username (validated)
secret: Password/secret (validated)
key: Encryption key
Raises:
ValidationError: If validation fails
RuntimeError: If entry not found or update fails
"""
# Validate inputs
entry_uuid = validate_entry_id(entry_id)
service = validate_service(service)
username = validate_username(username)
secret = validate_password(secret)
encrypted = encrypt(secret, key)
try:
with get_conn() as conn:
result = conn.execute(
"""
UPDATE vault
SET service = %s,
username = %s,
secret = %s,
updated_at = now()
WHERE id = %s
""",
(service, username, encrypted, entry_uuid),
)
conn.commit()
if result.rowcount == 0:
raise RuntimeError("ENTRY_NOT_FOUND")
except RuntimeError:
raise
except Exception as e:
raise RuntimeError(f"Failed to update entry: {str(e)}")