-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Expand file tree
/
Copy pathapp.py
More file actions
184 lines (138 loc) · 5.7 KB
/
app.py
File metadata and controls
184 lines (138 loc) · 5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
__version__ = "1.1.3"
import html
import os
from bson import ObjectId
from dotenv import load_dotenv
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorGridFSBucket
from sanic import Sanic, response
from sanic.exceptions import NotFound
from jinja2 import Environment, FileSystemLoader
from core.models import LogEntry, build_archive_lookup
load_dotenv()
if "URL_PREFIX" in os.environ:
print("Using the legacy config var `URL_PREFIX`, rename it to `LOG_URL_PREFIX`")
prefix = os.environ["URL_PREFIX"]
else:
prefix = os.getenv("LOG_URL_PREFIX", "/logs")
if prefix == "NONE":
prefix = ""
MONGO_URI = os.getenv("MONGO_URI") or os.getenv("CONNECTION_URI")
if not MONGO_URI:
print("No CONNECTION_URI config var found. "
"Please enter your MongoDB connection URI in the configuration or .env file.")
exit(1)
app = Sanic(__name__)
app.static("/static", "./static")
jinja_env = Environment(loader=FileSystemLoader("templates"))
def render_template(name, *args, **kwargs):
template = jinja_env.get_template(name + ".html")
return response.html(template.render(*args, **kwargs))
app.ctx.render_template = render_template
def strtobool(val):
"""
Copied from distutils.strtobool.
Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return 1
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
return 0
else:
raise ValueError("invalid truth value %r" % (val,))
SAVE_ATTACHMENTS = strtobool(os.getenv("SAVE_ATTACHMENTS", "no"))
ARCHIVE_INTERVAL = int(os.getenv("ARCHIVE_INTERVAL", "600"))
ARCHIVE_MAX_FILE_SIZE = int(os.getenv("ARCHIVE_MAX_FILE_SIZE", str(25 * 1024 * 1024)))
ARCHIVE_RETENTION = os.getenv("ARCHIVE_RETENTION", "forever").strip().lower()
ARCHIVE_COMPRESS_IMAGES = strtobool(os.getenv("ARCHIVE_COMPRESS_IMAGES", "yes"))
ARCHIVE_IMAGE_QUALITY = int(os.getenv("ARCHIVE_IMAGE_QUALITY", "65"))
ARCHIVE_IMAGE_MAX_RESOLUTION = int(os.getenv("ARCHIVE_IMAGE_MAX_RESOLUTION", "1920"))
@app.listener("before_server_start")
async def init(app, loop):
app.ctx.db = AsyncIOMotorClient(MONGO_URI).modmail_bot
use_attachment_proxy = strtobool(os.getenv("USE_ATTACHMENT_PROXY", "no"))
if use_attachment_proxy:
app.ctx.attachment_proxy_url = os.getenv("ATTACHMENT_PROXY_URL", "https://cdn.discordapp.xyz")
app.ctx.attachment_proxy_url = html.escape(app.ctx.attachment_proxy_url).rstrip("/")
else:
app.ctx.attachment_proxy_url = None
# Attachment archival setup
app.ctx.save_attachments = bool(SAVE_ATTACHMENTS)
if app.ctx.save_attachments:
app.ctx.fs = AsyncIOMotorGridFSBucket(app.ctx.db, bucket_name="attachments")
await app.ctx.db.archived_attachments.create_index("original_url", unique=True)
await app.ctx.db.archived_attachments.create_index("status")
await app.ctx.db.archived_attachments.create_index("archived_at")
else:
app.ctx.fs = None
@app.listener("after_server_start")
async def start_archiver(app, loop):
if app.ctx.save_attachments:
from core.archiver import run_archiver_loop
archiver_config = {
"interval": ARCHIVE_INTERVAL,
"max_file_size": ARCHIVE_MAX_FILE_SIZE,
"retention": ARCHIVE_RETENTION,
"compress_images": bool(ARCHIVE_COMPRESS_IMAGES),
"image_quality": ARCHIVE_IMAGE_QUALITY,
"image_max_resolution": ARCHIVE_IMAGE_MAX_RESOLUTION,
}
app.add_task(run_archiver_loop(app, archiver_config))
@app.exception(NotFound)
async def not_found(request, exc):
return render_template("not_found")
@app.get("/")
async def index(request):
return render_template("index")
@app.get(prefix + "/raw/<key>")
async def get_raw_logs_file(request, key):
"""Returns the plain text rendered log entry"""
document = await app.ctx.db.logs.find_one({"key": key})
if document is None:
raise NotFound
archive_lookup = await build_archive_lookup(app, document)
log_entry = LogEntry(app, document, archive_lookup=archive_lookup)
return log_entry.render_plain_text()
@app.get(prefix + "/<key>")
async def get_logs_file(request, key):
"""Returns the html rendered log entry"""
document = await app.ctx.db.logs.find_one({"key": key})
if document is None:
raise NotFound
archive_lookup = await build_archive_lookup(app, document)
log_entry = LogEntry(app, document, archive_lookup=archive_lookup)
return log_entry.render_html()
@app.get("/attachments/<file_id>/<filename>")
async def serve_attachment(request, file_id, filename):
"""Serve an archived attachment from GridFS."""
if not app.ctx.save_attachments or app.ctx.fs is None:
raise NotFound
try:
oid = ObjectId(file_id)
except Exception:
raise NotFound
try:
grid_out = await app.ctx.fs.open_download_stream(oid)
except Exception:
raise NotFound
content_type = "application/octet-stream"
if grid_out.metadata:
content_type = grid_out.metadata.get("content_type", content_type)
data = await grid_out.read()
return response.raw(
data,
content_type=content_type,
headers={
"Content-Disposition": f'inline; filename="{filename}"',
"Cache-Control": "public, max-age=31536000, immutable",
},
)
if __name__ == "__main__":
app.run(
host=os.getenv("HOST", "0.0.0.0"),
port=int(os.getenv("PORT", 8000)),
debug=bool(os.getenv("DEBUG", False)),
)