diff --git a/EssentialsDiscord/src/main/java/net/essentialsx/discord/JDADiscordService.java b/EssentialsDiscord/src/main/java/net/essentialsx/discord/JDADiscordService.java index cdaa1ceec06..8356f753897 100644 --- a/EssentialsDiscord/src/main/java/net/essentialsx/discord/JDADiscordService.java +++ b/EssentialsDiscord/src/main/java/net/essentialsx/discord/JDADiscordService.java @@ -50,7 +50,7 @@ import net.essentialsx.discord.util.ConsoleInjector; import net.essentialsx.discord.util.DiscordUtil; import net.essentialsx.discord.util.MessageUtil; -import net.essentialsx.discord.util.WrappedWebhookClient; +import net.essentialsx.discord.util.WebhookDispatcher; import org.bukkit.Bukkit; import org.bukkit.entity.Player; import org.bukkit.event.HandlerList; @@ -84,11 +84,11 @@ public class JDADiscordService implements DiscordService, IEssentialsModule { private JDA jda; private Guild guild; private TextChannel primaryChannel; - private WrappedWebhookClient consoleWebhook; + private WebhookDispatcher consoleWebhook; private String lastConsoleId; private final Map registeredTypes = new HashMap<>(); private final Map typeToChannelId = new HashMap<>(); - private final Map channelIdToWebhook = new HashMap<>(); + private final Map channelIdToWebhook = new HashMap<>(); private ConsoleInjector injector; private DiscordCommandDispatcher commandDispatcher; private InteractionControllerImpl interactionController; @@ -130,12 +130,7 @@ public WebhookMessage getWebhookMessage(String message) { } public WebhookMessage getWebhookMessage(String message, String avatarUrl, String name, boolean groupMentions) { - return new WebhookMessageBuilder() - .setAvatarUrl(avatarUrl) - .setAllowedMentions(groupMentions ? DiscordUtil.ALL_MENTIONS_WEBHOOK : DiscordUtil.NO_GROUP_MENTIONS_WEBHOOK) - .setUsername(name) - .setContent(message) - .build(); + return new WebhookMessageBuilder().setAvatarUrl(avatarUrl).setAllowedMentions(groupMentions ? DiscordUtil.ALL_MENTIONS_WEBHOOK : DiscordUtil.NO_GROUP_MENTIONS_WEBHOOK).setUsername(name).setContent(message).build(); } public void sendMessage(DiscordMessageEvent event, String message, boolean groupMentions) { @@ -145,11 +140,11 @@ public void sendMessage(DiscordMessageEvent event, String message, boolean group final String webhookChannelId = typeToChannelId.get(event.getType()); if (webhookChannelId != null) { - final WrappedWebhookClient client = channelIdToWebhook.get(webhookChannelId); - if (client != null) { + final WebhookDispatcher dispatcher = channelIdToWebhook.get(webhookChannelId); + if (dispatcher != null) { final String avatarUrl = event.getAvatarUrl() != null ? event.getAvatarUrl() : jda.getSelfUser().getAvatarUrl(); final String name = event.getName() != null ? event.getName() : guild.getSelfMember().getEffectiveName(); - client.send(getWebhookMessage(strippedContent, avatarUrl, name, groupMentions)); + dispatcher.send(getWebhookMessage(strippedContent, avatarUrl, name, groupMentions)); return; } } @@ -158,9 +153,7 @@ public void sendMessage(DiscordMessageEvent event, String message, boolean group logger.warning(tlLiteral("discordNoSendPermission", channel.getName())); return; } - channel.sendMessage(strippedContent) - .setAllowedMentions(groupMentions ? null : DiscordUtil.NO_GROUP_MENTIONS) - .queue(); + channel.sendMessage(strippedContent).setAllowedMentions(groupMentions ? null : DiscordUtil.NO_GROUP_MENTIONS).queue(null, error -> logger.log(Level.WARNING, "Failed to send message to channel " + channel.getName(), error)); } public void startup() throws LoginException, InterruptedException { @@ -178,15 +171,7 @@ public void startup() throws LoginException, InterruptedException { proxySettings.setServer(plugin.getSettings().getHttpProxyServer()); } - jda = JDABuilder.createDefault(plugin.getSettings().getBotToken()) - .setWebsocketFactory(wsFactory) - .addEventListeners(new DiscordListener(this)) - .enableIntents(GatewayIntent.MESSAGE_CONTENT) - .enableCache(CacheFlag.EMOJI) - .disableCache(CacheFlag.MEMBER_OVERRIDES, CacheFlag.VOICE_STATE) - .setContextEnabled(false) - .build() - .awaitReady(); + jda = JDABuilder.createDefault(plugin.getSettings().getBotToken()).setWebsocketFactory(wsFactory).addEventListeners(new DiscordListener(this)).enableIntents(GatewayIntent.MESSAGE_CONTENT).enableCache(CacheFlag.EMOJI).disableCache(CacheFlag.MEMBER_OVERRIDES, CacheFlag.VOICE_STATE).setContextEnabled(false).build().awaitReady(); invalidStartup = false; updatePresence(); logger.log(Level.INFO, tlLiteral("discordLoggingInDone", jda.getSelfUser().getAsTag())); @@ -203,10 +188,7 @@ public void startup() throws LoginException, InterruptedException { } final Collection requiredPermissions = ImmutableList.of(Permission.MANAGE_WEBHOOKS, Permission.MANAGE_ROLES, Permission.NICKNAME_MANAGE, Permission.VIEW_CHANNEL, Permission.MESSAGE_SEND, Permission.MESSAGE_EMBED_LINKS); - final String[] missingPermissions = requiredPermissions.stream() - .filter(permission -> !guild.getSelfMember().hasPermission(permission)) - .map(Permission::getName) - .toArray(String[]::new); + final String[] missingPermissions = requiredPermissions.stream().filter(permission -> !guild.getSelfMember().hasPermission(permission)).map(Permission::getName).toArray(String[]::new); if (missingPermissions.length > 0) { invalidStartup = true; @@ -229,7 +211,7 @@ public void startup() throws LoginException, InterruptedException { } // Load emotes into cache, JDA will handle updates from here on out. - guild.retrieveEmojis().queue(); + guild.retrieveEmojis().queue(null, error -> logger.log(Level.WARNING, "Failed to retrieve emojis from guild", error)); updatePrimaryChannel(); @@ -305,23 +287,11 @@ public void sendChatMessage(final Player player, final String chatMessage) { public void sendChatMessage(ChatType chatType, Player player, String chatMessage) { final User user = getPlugin().getEss().getUser(player); - final String formattedMessage = MessageUtil.formatMessage(getSettings().getMcToDiscordFormat(player, chatType), - MessageUtil.sanitizeDiscordMarkdown(player.getName()), - MessageUtil.sanitizeDiscordMarkdown(player.getDisplayName()), - user.isAuthorized("essentials.discord.markdown") ? chatMessage : MessageUtil.sanitizeDiscordMarkdown(chatMessage), - MessageUtil.sanitizeDiscordMarkdown(getPlugin().getEss().getSettings().getWorldAlias(player.getWorld().getName())), - MessageUtil.sanitizeDiscordMarkdown(FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getPrefix(player))), - MessageUtil.sanitizeDiscordMarkdown(FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getSuffix(player)))); + final String formattedMessage = MessageUtil.formatMessage(getSettings().getMcToDiscordFormat(player, chatType), MessageUtil.sanitizeDiscordMarkdown(player.getName()), MessageUtil.sanitizeDiscordMarkdown(player.getDisplayName()), user.isAuthorized("essentials.discord.markdown") ? chatMessage : MessageUtil.sanitizeDiscordMarkdown(chatMessage), MessageUtil.sanitizeDiscordMarkdown(getPlugin().getEss().getSettings().getWorldAlias(player.getWorld().getName())), MessageUtil.sanitizeDiscordMarkdown(FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getPrefix(player))), MessageUtil.sanitizeDiscordMarkdown(FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getSuffix(player)))); final String avatarUrl = DiscordUtil.getAvatarUrl(this, player); - final String formattedName = MessageUtil.formatMessage(getSettings().getMcToDiscordNameFormat(player), - player.getName(), - player.getDisplayName(), - getPlugin().getEss().getSettings().getWorldAlias(player.getWorld().getName()), - FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getPrefix(player)), - FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getSuffix(player)), - guild.getMember(jda.getSelfUser()).getEffectiveName()); + final String formattedName = MessageUtil.formatMessage(getSettings().getMcToDiscordNameFormat(player), player.getName(), player.getDisplayName(), getPlugin().getEss().getSettings().getWorldAlias(player.getWorld().getName()), FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getPrefix(player)), FormatUtil.stripEssentialsFormat(getPlugin().getEss().getPermissionsHandler().getSuffix(player)), guild.getMember(jda.getSelfUser()).getEffectiveName()); DiscordUtil.dispatchDiscordMessage(this, chatTypeToMessageType(chatType), formattedMessage, user.isAuthorized("essentials.discord.ping"), avatarUrl, formattedName, player.getUniqueId()); } @@ -390,8 +360,8 @@ public void updatePresence() { public void updateTypesRelay() { if (!getSettings().isShowAvatar() && !getSettings().isCustomBotName()) { - for (WrappedWebhookClient webhook : channelIdToWebhook.values()) { - webhook.close(); + for (WebhookDispatcher dispatcher : channelIdToWebhook.values()) { + dispatcher.close(); } typeToChannelId.clear(); channelIdToWebhook.clear(); @@ -410,14 +380,14 @@ public void updateTypesRelay() { final Webhook webhook = DiscordUtil.getOrCreateWebhook(channel, DiscordUtil.ADVANCED_RELAY_NAME).join(); if (webhook == null) { - final WrappedWebhookClient current = channelIdToWebhook.remove(channel.getId()); + final WebhookDispatcher current = channelIdToWebhook.remove(channel.getId()); if (current != null) { current.close(); } continue; } typeToChannelId.put(type, channel.getId()); - channelIdToWebhook.put(channel.getId(), DiscordUtil.getWebhookClient(webhook.getIdLong(), webhook.getToken(), jda.getHttpClient())); + channelIdToWebhook.put(channel.getId(), new WebhookDispatcher(DiscordUtil.getWebhookClient(webhook.getIdLong(), webhook.getToken(), jda.getHttpClient()))); } } @@ -471,7 +441,7 @@ public void updateConsoleRelay() { } shutdownConsoleRelay(false); - consoleWebhook = DiscordUtil.getWebhookClient(webhookId, webhookToken, jda.getHttpClient()); + consoleWebhook = new WebhookDispatcher(DiscordUtil.getWebhookClient(webhookId, webhookToken, jda.getHttpClient()), 100); if (injector == null || injector.isRemoved()) { injector = new ConsoleInjector(this); injector.start(); @@ -510,8 +480,8 @@ public void shutdown() { shutdownConsoleRelay(true); - for (WrappedWebhookClient webhook : channelIdToWebhook.values()) { - webhook.close(); + for (WebhookDispatcher dispatcher : channelIdToWebhook.values()) { + dispatcher.close(); } // Unregister leftover jda listeners @@ -583,7 +553,10 @@ public CompletableFuture modifyMemberRoles(InteractionMember member, Colle } final CompletableFuture future = new CompletableFuture<>(); - guild.modifyMemberRoles(((InteractionMemberImpl) member).getJdaObject(), add, remove).queue(future::complete); + guild.modifyMemberRoles(((InteractionMemberImpl) member).getJdaObject(), add, remove).queue(future::complete, error -> { + logger.log(Level.WARNING, "Failed to modify member roles", error); + future.complete(null); + }); return future; } @@ -613,7 +586,7 @@ public DiscordSettings getSettings() { return plugin.getSettings(); } - public WrappedWebhookClient getConsoleWebhook() { + public WebhookDispatcher getConsoleWebhook() { return consoleWebhook; } diff --git a/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/ConsoleInjector.java b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/ConsoleInjector.java index 4794ddce786..266fee02937 100644 --- a/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/ConsoleInjector.java +++ b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/ConsoleInjector.java @@ -14,10 +14,8 @@ import org.bukkit.Bukkit; import java.time.Instant; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import static com.earth2me.essentials.I18n.tlLiteral; @@ -27,40 +25,18 @@ public class ConsoleInjector extends AbstractAppender { private final static java.util.logging.Logger logger = EssentialsDiscord.getWrappedLogger(); private final static long QUEUE_PROCESS_PERIOD_SECONDS = 2; + private final static int QUEUE_CAPACITY = 500; private final JDADiscordService jda; - private final BlockingQueue messageQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue messageQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY); private final int taskId; private boolean removed = false; - private final AtomicLong lastRateLimitTime = new AtomicLong(0); - private final AtomicInteger recentRateLimit = new AtomicInteger(0); - private final AtomicInteger totalBackoffEvents = new AtomicInteger(); - public ConsoleInjector(JDADiscordService jda) { super("EssentialsX-ConsoleInjector", null, null, false); this.jda = jda; ((Logger) LogManager.getRootLogger()).addAppender(this); taskId = Bukkit.getScheduler().runTaskTimerAsynchronously(jda.getPlugin(), () -> { - // Check to see if we're supposed to be backing off, preform backoff if the case. - if (recentRateLimit.get() < 0) { - if (totalBackoffEvents.get() * 20 >= jda.getSettings().getConsoleSkipDelay() * 60) { - logger.warning("EssXBackoff: Reached console skip delay, attempt to skip"); - jda.getConsoleWebhook().abandonRequests(); - messageQueue.clear(); - totalBackoffEvents.set(0); - recentRateLimit.set(0); - lastRateLimitTime.set(0); - return; - } - - final int backoff = recentRateLimit.incrementAndGet(); - if (jda.isDebug()) { - logger.warning("EssXBackoff: Webhook backoff in progress, skipping queue processing. Resuming in " + Math.abs(backoff) + " cycles."); - } - return; - } - final StringBuilder buffer = new StringBuilder(); String curLine; while ((curLine = messageQueue.peek()) != null) { @@ -78,7 +54,11 @@ public ConsoleInjector(JDADiscordService jda) { } private void sendMessage(String content) { - jda.getConsoleWebhook().send(jda.getWebhookMessage(content)).exceptionally(e -> { + final WebhookDispatcher webhook = jda.getConsoleWebhook(); + if (webhook == null || webhook.isShutdown()) { + return; + } + webhook.send(jda.getWebhookMessage(content)).exceptionally(e -> { logger.severe(tlLiteral("discordErrorWebhook")); remove(); return null; @@ -97,40 +77,13 @@ public void append(LogEvent event) { return; } - if (entry.startsWith("EssXBackoff: ")) { - return; - } - - if (event.getLoggerName().contains("club.minnced.discord.webhook.WebhookClient") && entry.startsWith("Encountered 429, retrying after ")) { - if (recentRateLimit.get() >= 0) { - recentRateLimit.incrementAndGet(); - } - - if (lastRateLimitTime.get() == 0 || System.currentTimeMillis() - lastRateLimitTime.get() > 5000) { - lastRateLimitTime.set(System.currentTimeMillis()); - - // A negative value would mean the timer is current preforming a backoff, don't stop it. - if (recentRateLimit.get() >= 0) { - recentRateLimit.set(0); - } - } else if (recentRateLimit.get() >= 2) { - // Start the webhook backoff, defaulting to 20s, which should reset our bucket. - if (jda.isDebug()) { - totalBackoffEvents.getAndIncrement(); - logger.warning("EssXBackoff: Beginning Webhook Backoff"); - } - recentRateLimit.set(-20); - } - return; - } - final String[] loggerNameSplit = event.getLoggerName().split("\\."); final String loggerName = loggerNameSplit[loggerNameSplit.length - 1].trim(); if (!loggerName.isEmpty()) { entry = "[" + loggerName + "] " + entry; } - + if (!jda.getSettings().getConsoleFilters().isEmpty()) { for (final Pattern pattern : jda.getSettings().getConsoleFilters()) { if (pattern.matcher(entry).find()) { @@ -139,11 +92,18 @@ public void append(LogEvent event) { } } - messageQueue.addAll(Splitter.fixedLength(Message.MAX_CONTENT_LENGTH - 50).splitToList( + for (final String line : Splitter.fixedLength(Message.MAX_CONTENT_LENGTH - 50).splitToList( MessageUtil.formatMessage(jda.getSettings().getConsoleFormat(), TimeFormat.TIME_LONG.format(Instant.now()), event.getLevel().name(), - MessageUtil.sanitizeDiscordMarkdown(entry)))); + MessageUtil.sanitizeDiscordMarkdown(entry)))) { + + if (!messageQueue.offer(line)) { + if (jda.isDebug()) { + logger.fine("Console relay queue full, dropping message."); + } + } + } } public void remove() { diff --git a/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/DiscordUtil.java b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/DiscordUtil.java index 4911e91689a..8686e65c4cd 100644 --- a/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/DiscordUtil.java +++ b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/DiscordUtil.java @@ -15,6 +15,7 @@ import net.dv8tion.jda.api.entities.channel.concrete.TextChannel; import net.essentialsx.api.v2.events.discord.DiscordMessageEvent; import net.essentialsx.api.v2.services.discord.MessageType; +import net.essentialsx.discord.EssentialsDiscord; import net.essentialsx.discord.JDADiscordService; import okhttp3.OkHttpClient; import org.bukkit.Bukkit; @@ -26,8 +27,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Predicate; +import java.util.logging.Level; +import java.util.logging.Logger; public final class DiscordUtil { + private static final Logger logger = EssentialsDiscord.getWrappedLogger(); public final static String ADVANCED_RELAY_NAME = "EssX Advanced Relay"; public final static String CONSOLE_RELAY_NAME = "EssX Console Relay"; public final static List NO_GROUP_MENTIONS; @@ -63,7 +67,6 @@ public static WrappedWebhookClient getWebhookClient(long id, String token, OkHtt * * @param channel The channel to search for/create webhooks in. * @param webhookName The name of the webhook to search for/create. - * * @return A future which completes with the webhook by the given name in the given channel, or null * if the bot lacks the proper permissions. */ @@ -82,7 +85,7 @@ public static CompletableFuture getOrCreateWebhook(final TextChannel ch } } createWebhook(channel, webhookName).thenAccept(future::complete); - }); + }, error -> logger.log(Level.WARNING, "Failed to retrieve webhooks from channel " + channel.getName(), error)); return future; } @@ -101,17 +104,17 @@ private static void cleanWebhooks(final Guild guild, String webhookName) { guild.retrieveWebhooks().queue(webhooks -> { for (final Webhook webhook : webhooks) { if (webhook.getName().equalsIgnoreCase(webhookName) && !ACTIVE_WEBHOOKS.contains(webhook.getId())) { - webhook.delete().reason("EssentialsX Discord: webhook cleanup").queue(); + webhook.delete().reason("EssentialsX Discord: webhook cleanup").queue(null, error -> logger.log(Level.WARNING, "Failed to delete webhook " + webhook.getName(), error)); } } - }); + }, error -> logger.log(Level.WARNING, "Failed to retrieve webhooks from guild " + guild.getName(), error)); } /** * Creates a webhook with the given name in the given channel. * - * @param channel The channel to search for webhooks in. - * @param webhookName The name of the webhook to look for. + * @param channel The channel to search for webhooks in. + * @param webhookName The name of the webhook to look for. * @return A future which completes with the webhook by the given name in the given channel or null if no permissions. */ public static CompletableFuture createWebhook(TextChannel channel, String webhookName) { @@ -123,7 +126,7 @@ public static CompletableFuture createWebhook(TextChannel channel, Stri channel.createWebhook(webhookName).queue(webhook -> { future.complete(webhook); ACTIVE_WEBHOOKS.addIfAbsent(webhook.getId()); - }); + }, error -> logger.log(Level.WARNING, "Failed to create webhook " + webhookName + " in channel " + channel.getName(), error)); return future; } diff --git a/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/WebhookDispatcher.java b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/WebhookDispatcher.java new file mode 100644 index 00000000000..6025c62dbb4 --- /dev/null +++ b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/WebhookDispatcher.java @@ -0,0 +1,210 @@ +package net.essentialsx.discord.util; + +import club.minnced.discord.webhook.receive.ReadonlyMessage; +import club.minnced.discord.webhook.send.WebhookMessage; +import net.essentialsx.discord.EssentialsDiscord; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class WebhookDispatcher { + private static final Logger logger = EssentialsDiscord.getWrappedLogger(); + + private static final int MAX_TOKENS = 5; + private static final long REFILL_PERIOD_MS = 2000; + private static final long DRAIN_INTERVAL_MS = 400; + private static final int DEFAULT_QUEUE_CAPACITY = 50; + + private final WrappedWebhookClient client; + private final BlockingQueue queue; + private final ScheduledExecutorService scheduler; + private final ScheduledFuture drainTask; + + private final AtomicInteger tokens = new AtomicInteger(MAX_TOKENS); + private final AtomicLong lastRefillTime = new AtomicLong(System.currentTimeMillis()); + + private final AtomicInteger suppressedCount = new AtomicInteger(0); + private final AtomicInteger totalDropped = new AtomicInteger(0); + private final AtomicInteger total429s = new AtomicInteger(0); + + private volatile boolean shutdown = false; + + public WebhookDispatcher(WrappedWebhookClient client) { + this(client, DEFAULT_QUEUE_CAPACITY); + } + + public WebhookDispatcher(WrappedWebhookClient client, int queueCapacity) { + this.client = client; + this.queue = new LinkedBlockingQueue<>(queueCapacity); + this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "EssX-WebhookDispatcher-" + client.getId()); + t.setDaemon(true); + return t; + }); + this.drainTask = scheduler.scheduleAtFixedRate(this::drain, DRAIN_INTERVAL_MS, DRAIN_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + public CompletableFuture send(WebhookMessage message) { + if (shutdown) { + return CompletableFuture.completedFuture(null); + } + + final CompletableFuture future = new CompletableFuture<>(); + final PendingMessage pending = new PendingMessage(message, future); + + if (!queue.offer(pending)) { + final PendingMessage dropped = queue.poll(); + if (dropped != null) { + dropped.future.complete(null); + suppressedCount.incrementAndGet(); + totalDropped.incrementAndGet(); + } + + if (!queue.offer(pending)) { + future.complete(null); + totalDropped.incrementAndGet(); + } + } + + return future; + } + + private void drain() { + if (shutdown) { + return; + } + + refillTokens(); + + while (!queue.isEmpty()) { + if (tokens.get() <= 0) { + break; + } + + final PendingMessage pending = queue.poll(); + if (pending == null) { + break; + } + + final int remaining = tokens.decrementAndGet(); + if (remaining < 0) { + tokens.incrementAndGet(); + if (!queue.offer(pending)) { + pending.future.complete(null); + suppressedCount.incrementAndGet(); + } + break; + } + + final int suppressed = suppressedCount.getAndSet(0); + if (suppressed > 0) { + logger.info("Webhook dispatcher: " + suppressed + " message(s) dropped due to rate limit backpressure. " + "(Total dropped: " + totalDropped.get() + ", Total 429s: " + total429s.get() + ")"); + } + + try { + client.send(pending.message).whenComplete((result, error) -> { + if (error != null) { + final String errorMsg = error.getMessage() != null ? error.getMessage() : error.getClass().getSimpleName(); + if (errorMsg.contains("429") || errorMsg.contains("rate limit")) { + total429s.incrementAndGet(); + logger.warning("Webhook rate limited (429). Queue depth: " + queue.size() + ", total 429s: " + total429s.get() + ", total dropped: " + totalDropped.get()); + } else { + logger.log(Level.WARNING, "Webhook send failed", error); + } + pending.future.completeExceptionally(error); + } else { + pending.future.complete(result); + } + }); + } catch (Exception e) { + logger.log(Level.WARNING, "Error dispatching webhook message", e); + pending.future.completeExceptionally(e); + } + } + } + + private void refillTokens() { + final long now = System.currentTimeMillis(); + final long last = lastRefillTime.get(); + + if (now - last >= REFILL_PERIOD_MS) { + if (lastRefillTime.compareAndSet(last, now)) { + tokens.set(MAX_TOKENS); + } + } + } + + public void abandonRequests() { + PendingMessage pending; + int count = 0; + while ((pending = queue.poll()) != null) { + pending.future.complete(null); + count++; + } + suppressedCount.set(0); + client.abandonRequests(); + if (count > 0) { + logger.info("WebhookDispatcher: Abandoned " + count + " pending message(s)."); + } + } + + public void close() { + shutdown = true; + drainTask.cancel(false); + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } + + // Complete any remaining futures so callers are not left hanging + PendingMessage pending; + while ((pending = queue.poll()) != null) { + pending.future.complete(null); + } + + client.close(); + } + + public boolean isShutdown() { + return shutdown || client.isShutdown(); + } + + public WrappedWebhookClient getClient() { + return client; + } + + public int getQueueSize() { + return queue.size(); + } + + public int getTotalDropped() { + return totalDropped.get(); + } + + public int getTotal429s() { + return total429s.get(); + } + + private static class PendingMessage { + final WebhookMessage message; + final CompletableFuture future; + + PendingMessage(WebhookMessage message, CompletableFuture future) { + this.message = message; + this.future = future; + } + } +} diff --git a/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/WrappedWebhookClient.java b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/WrappedWebhookClient.java index 7a8a4db46fc..0b358321a80 100644 --- a/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/WrappedWebhookClient.java +++ b/EssentialsDiscord/src/main/java/net/essentialsx/discord/util/WrappedWebhookClient.java @@ -57,6 +57,10 @@ public WrappedWebhookClient(final long id, final String token, final OkHttpClien webhookQueue = queue; } + public long getId() { + return webhookClient.getId(); + } + public CompletableFuture send(WebhookMessage message) { return webhookClient.send(message); } diff --git a/EssentialsDiscordLink/src/main/java/net/essentialsx/discordlink/rolesync/RoleSyncManager.java b/EssentialsDiscordLink/src/main/java/net/essentialsx/discordlink/rolesync/RoleSyncManager.java index 8e9c31d74f4..6bf0a3c2e9f 100644 --- a/EssentialsDiscordLink/src/main/java/net/essentialsx/discordlink/rolesync/RoleSyncManager.java +++ b/EssentialsDiscordLink/src/main/java/net/essentialsx/discordlink/rolesync/RoleSyncManager.java @@ -3,7 +3,6 @@ import com.earth2me.essentials.UUIDPlayer; import com.google.common.collect.BiMap; import net.essentialsx.api.v2.events.discordlink.DiscordLinkStatusChangeEvent; -import net.essentialsx.api.v2.services.discord.InteractionMember; import net.essentialsx.api.v2.services.discord.InteractionRole; import net.essentialsx.discordlink.EssentialsDiscordLink; import org.bukkit.Bukkit; @@ -18,6 +17,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.logging.Level; import static com.earth2me.essentials.I18n.tlLiteral; @@ -25,6 +27,8 @@ public class RoleSyncManager implements Listener { private final EssentialsDiscordLink ess; private final Map groupToRoleMap = new HashMap<>(); private final Map roleIdToGroupMap = new HashMap<>(); + private final Semaphore syncSemaphore = new Semaphore(5); + private int syncCursor = 0; public RoleSyncManager(final EssentialsDiscordLink ess) { this.ess = ess; @@ -36,12 +40,28 @@ public RoleSyncManager(final EssentialsDiscordLink ess) { } final BiMap uuidToDiscordCopy = ess.getAccountStorage().getRawStorageMap(); + if (uuidToDiscordCopy.isEmpty()) { + syncCursor = 0; + return; + } + + final List> entries = new ArrayList<>(uuidToDiscordCopy.entrySet()); + final int size = entries.size(); + if (syncCursor >= size) { + syncCursor = 0; + } + + final int start = syncCursor; + final int end = Math.min(start + 50, size); + syncCursor = end >= size ? 0 : end; + final Map groupToRoleMapCopy = new HashMap<>(groupToRoleMap); final Map roleIdToGroupMapCopy = new HashMap<>(roleIdToGroupMap); final boolean primaryOnly = ess.getSettings().isRoleSyncPrimaryGroupOnly(); final boolean removeGroups = ess.getSettings().isRoleSyncRemoveGroups(); final boolean removeRoles = ess.getSettings().isRoleSyncRemoveRoles(); - for (final Map.Entry entry : uuidToDiscordCopy.entrySet()) { + for (int i = start; i < end; i++) { + final Map.Entry entry = entries.get(i); sync(new UUIDPlayer(UUID.fromString(entry.getKey())), entry.getValue(), groupToRoleMapCopy, roleIdToGroupMapCopy, primaryOnly, removeGroups, removeRoles); } }, 0, ess.getSettings().getRoleSyncResyncDelay() * 1200L); @@ -60,41 +80,50 @@ public void sync(final Player player, final String discordId, final Map groups = primaryOnly ? Collections.singletonList(ess.getEss().getPermissionsHandler().getGroup(player)) : ess.getEss().getPermissionsHandler().getGroups(player); - final InteractionMember member = ess.getApi().getMemberById(discordId).join(); - - if (member == null) { - if (ess.getSettings().isUnlinkOnLeave()) { - ess.getLinkManager().removeAccount(ess.getEss().getUser(player), DiscordLinkStatusChangeEvent.Cause.UNSYNC_LEAVE); - } else { - unSync(player.getUniqueId(), discordId); - } - return; - } - - final List toAdd = new ArrayList<>(); - final List toRemove = new ArrayList<>(); - - for (final Map.Entry entry : groupToRoleMap.entrySet()) { - if (groups.contains(entry.getKey()) && !member.hasRole(entry.getValue())) { - toAdd.add(entry.getValue()); - } else if (removeRoles && !groups.contains(entry.getKey()) && member.hasRole(entry.getValue())) { - toRemove.add(entry.getValue()); - } - } - - for (final Map.Entry entry : roleIdToGroupMap.entrySet()) { - if (member.hasRole(entry.getKey()) && !groups.contains(entry.getValue())) { - ess.getEss().getPermissionsHandler().addToGroup(player, entry.getValue()); - } else if (removeGroups && !member.hasRole(entry.getKey()) && groups.contains(entry.getValue())) { - ess.getEss().getPermissionsHandler().removeFromGroup(player, entry.getValue()); - } - } - - if (toAdd.isEmpty() && toRemove.isEmpty()) { - return; - } - - ess.getApi().modifyMemberRoles(member, toAdd, toRemove); + ess.getEss().runTaskAsynchronously(() -> { + syncSemaphore.acquireUninterruptibly(); + ess.getApi().getMemberById(discordId).thenCompose(member -> { + if (member == null) { + if (ess.getSettings().isUnlinkOnLeave()) { + ess.getLinkManager().removeAccount(ess.getEss().getUser(player), DiscordLinkStatusChangeEvent.Cause.UNSYNC_LEAVE); + } else { + ess.getEss().runTaskAsynchronously(() -> unSync(player.getUniqueId(), discordId)); + } + return CompletableFuture.completedFuture(null); + } + + final List toAdd = new ArrayList<>(); + final List toRemove = new ArrayList<>(); + + for (final Map.Entry entry : groupToRoleMap.entrySet()) { + if (groups.contains(entry.getKey()) && !member.hasRole(entry.getValue())) { + toAdd.add(entry.getValue()); + } else if (removeRoles && !groups.contains(entry.getKey()) && member.hasRole(entry.getValue())) { + toRemove.add(entry.getValue()); + } + } + + for (final Map.Entry entry : roleIdToGroupMap.entrySet()) { + if (member.hasRole(entry.getKey()) && !groups.contains(entry.getValue())) { + ess.getEss().getPermissionsHandler().addToGroup(player, entry.getValue()); + } else if (removeGroups && !member.hasRole(entry.getKey()) && groups.contains(entry.getValue())) { + ess.getEss().getPermissionsHandler().removeFromGroup(player, entry.getValue()); + } + } + + if (toAdd.isEmpty() && toRemove.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + return ess.getApi().modifyMemberRoles(member, toAdd, toRemove).exceptionally(e -> { + ess.getLogger().log(Level.WARNING, "Failed to modify Discord roles for " + player.getUniqueId() + " / " + discordId, e); + return null; + }); + }).exceptionally(e -> { + ess.getLogger().log(Level.WARNING, "Failed to fetch Discord member for " + player.getUniqueId() + " / " + discordId, e); + return null; + }).whenComplete((unused, throwable) -> syncSemaphore.release()); + }); } public void unSync(final UUID uuid, final String discordId) { @@ -108,7 +137,6 @@ public void unSync(final UUID uuid, final String discordId) { final Map roleIdToGroupMapCopy = new HashMap<>(roleIdToGroupMap); final Player player = new UUIDPlayer(uuid); - final InteractionMember member = ess.getApi().getMemberById(discordId).join(); if (removeGroups) { for (final String group : roleIdToGroupMapCopy.values()) { @@ -116,10 +144,26 @@ public void unSync(final UUID uuid, final String discordId) { } } - // Check if the member is no longer in the guild (null), they don't have any roles anyway. - if (removeRoles && member != null) { - ess.getApi().modifyMemberRoles(member, null, groupToRoleMapCopy.values()); + if (!removeRoles) { + return; } + + ess.getEss().runTaskAsynchronously(() -> { + syncSemaphore.acquireUninterruptibly(); + ess.getApi().getMemberById(discordId).thenCompose(member -> { + // Check if the member is no longer in the guild (null), they don't have any roles anyway. + if (member == null) { + return CompletableFuture.completedFuture(null); + } + return ess.getApi().modifyMemberRoles(member, null, groupToRoleMapCopy.values()).exceptionally(e -> { + ess.getLogger().log(Level.WARNING, "Failed to remove Discord roles for " + uuid + " / " + discordId, e); + return null; + }); + }).exceptionally(e -> { + ess.getLogger().log(Level.WARNING, "Failed to fetch Discord member for unsync " + uuid + " / " + discordId, e); + return null; + }).whenComplete((unused, throwable) -> syncSemaphore.release()); + }); } @EventHandler