-
-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathblooms.py
More file actions
338 lines (288 loc) · 11.7 KB
/
blooms.py
File metadata and controls
338 lines (288 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import datetime
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from data.connection import db_cursor
from data.users import User, get_user_by_id
@dataclass
class Bloom:
id: int
sender: User
content: str
sent_timestamp: datetime.datetime
rebloom_count: int = 0
original_bloom_id: Optional[int] = None
is_rebloom: bool = False
original_sender: Optional[User] = None
def add_bloom(
*,
sender: User,
content: str,
is_rebloom: bool = False,
original_bloom_id: Optional[int] = None
) -> int:
"""Create a new bloom and associate any hashtags."""
hashtags = [word[1:] for word in content.split(" ") if word.startswith("#")]
now = datetime.datetime.now(tz=datetime.UTC)
bloom_id = int(now.timestamp() * 1000000)
with db_cursor() as cur:
try:
cur.execute(
"""
INSERT INTO blooms
(id, sender_id, content, send_timestamp, original_bloom_id)
VALUES
(%(bloom_id)s, %(sender_id)s, %(content)s, %(timestamp)s, %(original_bloom_id)s)
""",
dict(
bloom_id=bloom_id,
sender_id=sender.id,
content=content,
timestamp=now, # Pass Python datetime object to resolve SQL type error
original_bloom_id=original_bloom_id,
),
)
for hashtag in hashtags:
cur.execute(
"INSERT INTO hashtags (hashtag, bloom_id) VALUES (%(hashtag)s, %(bloom_id)s)",
dict(hashtag=hashtag, bloom_id=bloom_id),
)
return bloom_id
except Exception as e:
# Keep error logging for debugging purposes
print(f"Error adding bloom: {e}")
# Returning 0 or raising an error are typical ways to indicate failure
return 0
def add_rebloom(user_id: int, original_bloom_id: int, content: str) -> Optional[int]:
"""Create a new rebloom"""
# 💥 FIX: Fetch the complete User object using the user_id
# This prevents the "missing required positional arguments" error
sender_user = get_user_by_id(user_id)
if sender_user is None:
print(f"Error adding rebloom: User with ID {user_id} not found.")
return None
with db_cursor() as cur:
try:
# First create the new bloom
rebloom_bloom_id = add_bloom(
sender=sender_user, # <<< Use the fully hydrated User object
content=content,
is_rebloom=True,
original_bloom_id=original_bloom_id
)
if rebloom_bloom_id:
# Create the rebloom relationship
cur.execute(
"INSERT INTO reblooms (user_id, original_bloom_id, rebloom_bloom_id) VALUES (%s, %s, %s) RETURNING id",
(user_id, original_bloom_id, rebloom_bloom_id)
)
result = cur.fetchone()
if result is None:
return None
rebloom_id = result[0]
# Update rebloom count on original bloom
# This step will now execute successfully since the code no longer crashes above
cur.execute(
"UPDATE blooms SET rebloom_count = rebloom_count + 1 WHERE id = %s",
(original_bloom_id,)
)
return rebloom_id
return None
except Exception as e:
# The count should now increase before any exception related to DB occurs
print(f"Error adding rebloom: {e}")
return None
def get_rebloom_by_user_and_bloom(user_id: int, original_bloom_id: int) -> Optional[Dict]:
"""Check if user has already rebloomed a specific bloom"""
with db_cursor() as cur:
cur.execute(
"SELECT * FROM reblooms WHERE user_id = %s AND original_bloom_id = %s",
(user_id, original_bloom_id)
)
row = cur.fetchone()
if row:
return {
'id': row[0],
'user_id': row[1],
'original_bloom_id': row[2],
'rebloom_bloom_id': row[3],
'created_at': row[4]
}
return None
def delete_rebloom(rebloom_id: int) -> bool:
"""Delete a rebloom and its associated bloom"""
with db_cursor() as cur:
try:
# Get the rebloom details first
cur.execute("SELECT rebloom_bloom_id, original_bloom_id FROM reblooms WHERE id = %s", (rebloom_id,))
rebloom_data = cur.fetchone()
if not rebloom_data:
return False
rebloom_bloom_id, original_bloom_id = rebloom_data
# Delete the rebloom bloom
cur.execute("DELETE FROM blooms WHERE id = %s", (rebloom_bloom_id,))
# Delete the rebloom relationship
cur.execute("DELETE FROM reblooms WHERE id = %s", (rebloom_id,))
# Decrement rebloom count on original bloom
cur.execute(
"UPDATE blooms SET rebloom_count = rebloom_count - 1 WHERE id = %s",
(original_bloom_id,)
)
return True
except Exception as e:
print(f"Error deleting rebloom: {e}")
return False
def has_user_rebloomed(user_id: int, bloom_id: int) -> bool:
"""Check if user has rebloomed a specific bloom"""
return get_rebloom_by_user_and_bloom(user_id, bloom_id) is not None
def get_rebloom_count(bloom_id: int) -> int:
"""Get the number of times a bloom has been rebloomed"""
with db_cursor() as cur:
cur.execute("SELECT rebloom_count FROM blooms WHERE id = %s", (bloom_id,))
result = cur.fetchone()
return result[0] if result else 0
def get_user_reblooms(user_id: int) -> List[Dict]:
"""Get all reblooms by a user"""
with db_cursor() as cur:
cur.execute("""
SELECT r.*, b.content, b.send_timestamp, ob.sender_id as original_sender_id,
ou.username as original_username, ob.id as original_bloom_id
FROM reblooms r
JOIN blooms b ON r.rebloom_bloom_id = b.id
JOIN blooms ob ON r.original_bloom_id = ob.id
JOIN users ou ON ob.sender_id = ou.id
WHERE r.user_id = %s
ORDER BY r.created_at DESC
""", (user_id,))
reblooms = []
for row in cur.fetchall():
reblooms.append({
'id': row[0],
'user_id': row[1],
'original_bloom_id': row[2],
'rebloom_bloom_id': row[3],
'created_at': row[4],
'content': row[5],
'rebloom_timestamp': row[6],
'original_sender_id': row[7],
'original_username': row[8],
'original_bloom_id': row[9]
})
return reblooms
def get_blooms_for_user(
username: str, *, before: Optional[int] = None, limit: Optional[int] = None
) -> List[Bloom]:
with db_cursor() as cur:
kwargs = {
"sender_username": username,
}
if before is not None:
before_clause = "AND send_timestamp < %(before_limit)s"
kwargs["before_limit"] = before
else:
before_clause = ""
limit_clause = make_limit_clause(limit, kwargs)
cur.execute(
f"""SELECT
blooms.id, users.username, content, send_timestamp, rebloom_count, original_bloom_id
FROM
blooms INNER JOIN users ON users.id = blooms.sender_id
WHERE
username = %(sender_username)s
{before_clause}
ORDER BY send_timestamp DESC
{limit_clause}
""",
kwargs,
)
rows = cur.fetchall()
blooms_list = []
for row in rows:
bloom_id, sender_username, content, timestamp, rebloom_count, original_bloom_id = row
bloom = Bloom(
id=bloom_id,
sender=sender_username,
content=content,
sent_timestamp=timestamp,
rebloom_count=rebloom_count,
original_bloom_id=original_bloom_id,
is_rebloom=original_bloom_id is not None
)
# If this is a rebloom, get original sender info
if original_bloom_id:
original_bloom = get_bloom(original_bloom_id)
if original_bloom:
bloom.original_sender = original_bloom.sender
blooms_list.append(bloom)
return blooms_list
def get_bloom(bloom_id: int) -> Optional[Bloom]:
with db_cursor() as cur:
cur.execute(
"SELECT blooms.id, users.username, content, send_timestamp, rebloom_count, original_bloom_id FROM blooms INNER JOIN users ON users.id = blooms.sender_id WHERE blooms.id = %s",
(bloom_id,),
)
row = cur.fetchone()
if row is None:
return None
bloom_id, sender_username, content, timestamp, rebloom_count, original_bloom_id = row
bloom = Bloom(
id=bloom_id,
sender=sender_username,
content=content,
sent_timestamp=timestamp,
rebloom_count=rebloom_count,
original_bloom_id=original_bloom_id,
is_rebloom=original_bloom_id is not None
)
# If this is a rebloom, get original sender info
if original_bloom_id:
original_bloom = get_bloom(original_bloom_id)
if original_bloom:
bloom.original_sender = original_bloom.sender
return bloom
def get_blooms_with_hashtag(
hashtag_without_leading_hash: str, *, limit: int = None
) -> List[Bloom]:
kwargs = {
"hashtag_without_leading_hash": hashtag_without_leading_hash,
}
limit_clause = make_limit_clause(limit, kwargs)
with db_cursor() as cur:
cur.execute(
f"""SELECT
blooms.id, users.username, content, send_timestamp, rebloom_count, original_bloom_id
FROM
blooms INNER JOIN hashtags ON blooms.id = hashtags.bloom_id INNER JOIN users ON blooms.sender_id = users.id
WHERE
hashtag = %(hashtag_without_leading_hash)s
ORDER BY send_timestamp DESC
{limit_clause}
""",
kwargs,
)
rows = cur.fetchall()
blooms_list = []
for row in rows:
bloom_id, sender_username, content, timestamp, rebloom_count, original_bloom_id = row
bloom = Bloom(
id=bloom_id,
sender=sender_username,
content=content,
sent_timestamp=timestamp,
rebloom_count=rebloom_count,
original_bloom_id=original_bloom_id,
is_rebloom=original_bloom_id is not None
)
# If this is a rebloom, get original sender info
if original_bloom_id:
original_bloom = get_bloom(original_bloom_id)
if original_bloom:
bloom.original_sender = original_bloom.sender
blooms_list.append(bloom)
return blooms_list
def make_limit_clause(limit: Optional[int], kwargs: Dict[Any, Any]) -> str:
if limit is not None:
limit_clause = "LIMIT %(limit)s"
kwargs["limit"] = limit
else:
limit_clause = ""
return limit_clause