-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
103 lines (77 loc) · 3.08 KB
/
main.py
File metadata and controls
103 lines (77 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import pika
import os
import json
from pymongo.errors import PyMongoError
from flask_socketio import SocketIO, emit
from flask_pymongo import PyMongo
from flask_cors import CORS
from flask import Flask, jsonify, request, render_template
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
RABBITMQ_QUEUE=os.getenv("RABBITMQ_QUEUE")
RABBITMQ_EXCHANGE=os.getenv("RABBITMQ_EXCHANGE")
RABBITMQ_ROUTING_KEY=os.getenv("RABBITMQ_ROUTING_KEY")
channel.exchange_declare(
exchange=RABBITMQ_EXCHANGE,
exchange_type="direct",
durable=True
)
channel.queue_declare(queue=RABBITMQ_QUEUE)
app = Flask(__name__)
app.config["MONGO_URI"] = "mongodb://root:example@localhost:27017/notifications_db?authSource=admin"
CORS(app, resources={r"/*": {"origins": "*"}})
socketio = SocketIO(app, cors_allowed_origins="*")
mongo = PyMongo(app)
collection_name = "notifications"
db = mongo.db[collection_name]
@app.route("/")
def home_page():
return render_template("index.html")
@app.route("/users/<int:user_id>/notifications", methods=["GET"])
def get_user_notifications(user_id: int):
try:
docs = list(db.find({"user_id": user_id}, {"_id": 0}))
if not docs:
return jsonify(error=f"No notifications for user_id: {user_id} found."), 400
return jsonify(data=docs), 200
except PyMongoError as e:
return jsonify(error=str(e)), 500
@app.route("/notifications", methods=["POST"])
def post_user_notifications():
try:
data = request.get_json()
if not data:
return jsonify(error="Empty payload."), 400
try:
payload = {
"user_id": data["user_id"],
"notification_type": data[
"notification_type"
], # can be email, sms, in-app
"content": data["content"],
"status": "pending", # can be pending, sent, failed
"timestamp": str(datetime.now()),
}
except KeyError as e:
return jsonify(error=str(e)), 400
result = db.insert_one(payload)
inserted_id = str(result.inserted_id)
payload["_id"] = inserted_id
# send over to rabbitmq
channel.basic_publish(exchange=RABBITMQ_EXCHANGE, routing_key=RABBITMQ_ROUTING_KEY, body=json.dumps(payload))
# hacky fix for in-app notifications, since it encounters race conditions, sometimes
if payload["notification_type"] == "in-app": channel.basic_publish(exchange=RABBITMQ_EXCHANGE, routing_key=RABBITMQ_ROUTING_KEY, body=json.dumps(payload))
return jsonify(message="Notification queued", id=inserted_id), 201
except PyMongoError as e:
return jsonify(error=str(e)), 500
# Handles socketio connections
@socketio.on("connect")
def handle_connect():
print("Client connected")
emit("message", "Connected to WebSocket server")
if __name__ == "__main__":
print("[Flask server starting...]")
socketio.run(app, debug=True, host="0.0.0.0", port=5050)