-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathpoll.py
More file actions
395 lines (327 loc) · 13.2 KB
/
poll.py
File metadata and controls
395 lines (327 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
"""Module for the poll extension for the discord bot."""
from __future__ import annotations
import asyncio
import io
import json
from typing import TYPE_CHECKING, Self
import discord
import emoji
import munch
from core import auxiliary, cogs
from discord.ext import commands
if TYPE_CHECKING:
import bot
async def setup(bot: bot.TechSupportBot) -> None:
"""Loading the Poller plugins into the bot
Args:
bot (bot.TechSupportBot): The bot object to register the cogs to
"""
await bot.add_cog(ReactionPoller(bot=bot))
await bot.add_cog(StrawPoller(bot=bot))
class PollGenerator(cogs.BaseCog):
"""Class to make the poll generator for the extension."""
async def validate_data(
self: Self,
ctx: commands.Context,
request_body: munch.Munch,
strawpoll: bool = False,
) -> munch.Munch:
"""Validates the uploaded json to ensure that the poll is valid.
Will potentially make changes to some aspects to ensure that everything works
such as timeout
Args:
ctx (commands.Context): The context in which the command was run
request_body (munch.Munch): The uploaded json of the created poll
strawpoll (bool, optional): If this should be a strawpoll or a reaction poll.
Defaults to False.
Returns:
munch.Munch: The validated and updated (if needed) config for the poll.
"""
# probably shouldn't touch this
max_options = len(self.OPTION_EMOJIS) if not strawpoll else 10
question = request_body.get("question")
options = request_body.get("options", [])
image_url = request_body.get("image_url")
timeout = request_body.get("timeout")
if not question:
await auxiliary.send_deny_embed(
message="I did not find a poll question (`question` key)",
channel=ctx.channel,
)
return None
if not isinstance(question, str):
await auxiliary.send_deny_embed(
message="I need the poll question to be a string (`question` key)",
channel=ctx.channel,
)
return None
if not isinstance(options, list):
await auxiliary.send_deny_embed(
message="I need the poll options to be a list (`options` key)",
channel=ctx.channel,
)
return None
if len(options) < 2 or len(options) > max_options:
await auxiliary.send_deny_embed(
message=f"I need between 2 and {max_options} options! (`options` key)",
channel=ctx.channel,
)
return None
if not strawpoll:
if not image_url or not isinstance(image_url, str):
request_body.image_url = (
"https://www.iconarchive.com/download/i6231/"
"custom-icon-design/pretty-office-6/polls.256.png"
)
if not timeout or not isinstance(timeout, int):
request_body.timeout = 60
elif request_body.timeout > 300:
request_body.timeout = 300
elif request_body.timeout < 10:
request_body.timeout = 10
return request_body
class ReactionPoller(PollGenerator):
"""Class to add reactions to the poll generator.
Attributes:
OPTION_EMOJIS (list[str]): The list of emojis to react to the message with
STOP_EMOJI (str): The stop emoji to reaction to the message with
EXAMPLE_DATA (dict[str, str | list[str] | int]): The example poll that the bot can use
"""
OPTION_EMOJIS: list[str] = ["one", "two", "three", "four", "five"]
STOP_EMOJI: str = "\u26d4"
EXAMPLE_DATA: dict[str, str | list[str] | int] = {
"question": "Best ice cream?",
"options": ["Chocolate", "Vanilla", "Strawberry", "Cookie Dough", "Other..."],
"timeout": 60,
}
async def preconfig(self: Self) -> None:
"""Method to preconfig the poll."""
self.option_emojis = [
emoji.emojize(f":{emoji_text}:", language="alias")
for emoji_text in self.OPTION_EMOJIS
]
@commands.group(
brief="Executes a poll command",
description="Executes a poll command",
)
async def poll(self: Self, ctx: commands.Context) -> None:
"""The bare .poll command. This does nothing but generate the help message
Args:
ctx (commands.Context): The context in which the command was run in
"""
return
@auxiliary.with_typing
@poll.command(
brief="Shows example poll JSON",
description="Shows what JSON to upload to generate a poll",
)
async def example(self: Self, ctx: commands.Context) -> None:
"""Method to show an example of a poll.
Args:
ctx (commands.Context): The context in which the command was run in
"""
json_file = discord.File(
io.StringIO(json.dumps(self.EXAMPLE_DATA, indent=4)),
filename="poll_example.json",
)
await ctx.send(file=json_file)
@auxiliary.with_typing
@commands.guild_only()
@poll.command(
aliases=["create"],
brief="Generates a poll",
description=(
"Creates a poll for everyone to vote in (only admins can make polls)"
),
usage="|json-upload|",
)
async def generate(self: Self, ctx: commands.Context) -> None:
"""Method to generate the poll for discord.
Args:
ctx (commands.Context): The context in which the command was run in
"""
request_body = await auxiliary.get_json_from_attachments(ctx.message)
if not request_body:
await auxiliary.send_deny_embed(
message="I couldn't find any data in your upload", channel=ctx.channel
)
return
request_body = await self.validate_data(ctx, request_body)
if not request_body:
return
message = await auxiliary.send_confirm_embed(
message="Poll loading...", channel=ctx.channel
)
display_timeout = (
request_body.timeout
if request_body.timeout <= 60
else request_body.timeout // 60
)
display_timeout_units = "seconds" if request_body.timeout <= 60 else "minutes"
embed = auxiliary.generate_basic_embed(
title=request_body.question,
description=f"Poll timeout: {display_timeout} {display_timeout_units}",
color=discord.Color.gold(),
url=request_body.image_url,
)
for index, option in enumerate(request_body.options):
embed.add_field(name=option, value=index + 1, inline=False)
await message.add_reaction(self.option_emojis[index])
await message.edit(content=None, embed=embed)
results = await self.wait_for_results(
ctx, message, request_body.timeout, request_body.options
)
if results is None:
await auxiliary.send_deny_embed(
message="I ran into an issue grabbing the poll results...",
channel=ctx.channel,
)
try:
await message.edit(content="*Poll aborted!*", embed=None)
await message.clear_reactions()
except discord.NotFound:
await auxiliary.send_deny_embed(
message=(
"I could not find the poll message. It might have been deleted?"
),
channel=ctx.channel,
)
except discord.Forbidden:
pass
return
total = sum(count for count in results.values())
if total == 0:
await auxiliary.send_deny_embed(
message=(
"Nobody voted in the poll, so I won't bother showing any results"
),
channel=ctx.channel,
)
return
embed = auxiliary.generate_basic_embed(
title=f"Poll results for `{request_body.question}`",
description=f"Votes: {total}",
color=discord.Color.gold(),
thumbnail_url=request_body.image_url,
)
for option, count in results.items():
percentage = str((count * 100) // total)
embed.add_field(name=option, value=f"{percentage}%", inline=False)
await ctx.send(embed=embed)
async def wait_for_results(
self: Self,
ctx: commands.Context,
message: discord.Message,
timeout: int,
options: list[str],
) -> dict[str, int]:
"""Waits for the poll to conclude, and then gets the results
Args:
ctx (commands.Context): The context in which the command was run in
message (discord.Message): The message containing the reaction poll
timeout (int): The amount of seconds the reaction poll should run for
options (list[str]): The list of the options for the poll
Returns:
dict[str, int]: The options list with the amount of votes for each option
"""
option_emojis = self.option_emojis[: len(options)]
await asyncio.sleep(timeout)
# count the votes after the poll finishes
voted = {}
excluded = set()
cached_message = discord.utils.get(ctx.bot.cached_messages, id=message.id)
if not cached_message:
return None
for reaction in cached_message.reactions:
async for user in reaction.users():
if user.bot:
continue
if voted.get(user.id):
# delete their vote and exclude them from the count
del voted[user.id]
excluded.add(user.id)
if user.id not in excluded:
try:
voted[user.id] = options[option_emojis.index(reaction.emoji)]
except ValueError:
pass
await message.delete()
unique_votes = list(voted.values())
# I thought of this myself
results = {option: unique_votes.count(option) for option in set(unique_votes)}
# this ensures even the 0 votes show up
for option in options:
if not results.get(option):
results[option] = 0
return results
class StrawPoller(PollGenerator):
"""Class to create a straw poll from discord.
Attributes:
EXAMPLE_DATA (dict[str, str | list[str]]): The example poll that the bot can use
API_URL (str): The strawpoll API URL
"""
EXAMPLE_DATA: dict[str, str | list[str]] = {
"question": "Best ice cream?",
"options": ["Chocolate", "Vanilla", "Strawberry", "Cookie Dough", "Other..."],
}
API_URL: str = "https://strawpoll.com/api/poll"
@commands.group(
brief="Executes a strawpoll command",
description="Executes a strawpoll command",
)
async def strawpoll(self: Self, ctx: commands.Context) -> None:
"""Method to give an exmaple poll with json.
Args:
ctx (commands.Context): The context in which the command was run in
"""
return
@auxiliary.with_typing
@strawpoll.command(
brief="Shows example poll JSON",
description="Shows what JSON to upload to generate a poll",
)
async def example(self: Self, ctx: commands.Context) -> None:
"""Method that contains the example file for a poll.
Args:
ctx (commands.Context): The context in which the command was run in
"""
json_file = discord.File(
io.StringIO(json.dumps(self.EXAMPLE_DATA, indent=4)),
filename="poll_example.json",
)
await ctx.send(file=json_file)
@auxiliary.with_typing
@strawpoll.command(
brief="Generates a strawpoll",
description="Returns a link to a Strawpoll generated by args",
usage="|json-upload|",
)
async def generate(self: Self, ctx: commands.Context) -> None:
"""Method to generate the poll form the discord command.
Args:
ctx (commands.Context): The context in which the command was run in
"""
request_body = await auxiliary.get_json_from_attachments(ctx.message)
if not request_body:
await auxiliary.send_deny_embed(
message="I couldn't find any data in your upload", channel=ctx.channel
)
return
request_body = await self.validate_data(ctx, request_body, strawpoll=True)
if not request_body:
return
post_body = {
"poll": {"title": request_body.question, "answers": request_body.options}
}
response = await self.bot.http_functions.http_call(
"post", self.API_URL, json=post_body
)
content_id = response.get("content_id")
if not content_id:
await auxiliary.send_deny_embed(
message="Strawpoll did not let me create a poll", channel=ctx.channel
)
return
await auxiliary.send_confirm_embed(
message=f"https://strawpoll.com/{content_id}", channel=ctx.channel
)