-
Notifications
You must be signed in to change notification settings - Fork 319
Add MessageBufferConfig to allow custom back-pressure config for message.new events
#6472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -125,9 +125,14 @@ public data class ChatClientConfig @JvmOverloads constructor( | |||||||||||||||||||||||||||
| * @param channelMessageLimits A set of [ChannelMessageLimit] defining the maximum number of messages to keep in | ||||||||||||||||||||||||||||
| * memory for different channel types. By default, this is an empty set, meaning no limits are applied and all | ||||||||||||||||||||||||||||
| * messages are kept in memory. Each channel type can have its own limit configured independently. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * @param messageBufferConfig Configuration for bounding the inbound `NewMessageEvent` buffer on selected channel | ||||||||||||||||||||||||||||
| * types. By default, no buffering is applied — events flow through the unbuffered path. See [MessageBufferConfig] | ||||||||||||||||||||||||||||
| * for details and trade-offs. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| public data class MessageLimitConfig( | ||||||||||||||||||||||||||||
| public val channelMessageLimits: Set<ChannelMessageLimit> = setOf(), | ||||||||||||||||||||||||||||
| public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(), | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
|
|
@@ -158,3 +163,75 @@ public data class ChannelMessageLimit( | |||||||||||||||||||||||||||
| public val channelType: String, | ||||||||||||||||||||||||||||
| public val baseLimit: Int, | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Configuration for buffering inbound `NewMessageEvent`s for specific channel types before they | ||||||||||||||||||||||||||||
| * are dispatched to the sequential event-handling pipeline. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * High-traffic channel types (e.g. livestreams) can produce a flood of new-message events that | ||||||||||||||||||||||||||||
| * arrive faster than they can be processed sequentially. This configuration applies a bounded | ||||||||||||||||||||||||||||
| * buffer with a configurable overflow strategy (e.g. drop oldest) for `NewMessageEvent`s on the | ||||||||||||||||||||||||||||
| * configured channel types only. Events for other channel types — and all non-`NewMessageEvent` | ||||||||||||||||||||||||||||
| * events — continue to flow through the default unbuffered path with `Int.MAX_VALUE` capacity, | ||||||||||||||||||||||||||||
| * so signal-critical events such as reads, bans or member updates are never dropped. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * By default this is a no-op: no channel types are configured, so the buffered code path is not | ||||||||||||||||||||||||||||
| * active and the SDK behaves exactly as if this configuration did not exist. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * **Event ordering caveat.** When buffering is active, `NewMessageEvent`s for opted-in channel | ||||||||||||||||||||||||||||
| * types flow through a separate buffer from all other events. As a consequence, the relative | ||||||||||||||||||||||||||||
| * ordering between buffered `NewMessageEvent`s and non-buffered events (e.g. `ReactionNewEvent`, | ||||||||||||||||||||||||||||
| * `MessageUpdatedEvent`) for the same channel is **not guaranteed** — a reaction added to | ||||||||||||||||||||||||||||
| * message X may be processed before the `NewMessageEvent` for X. Because this configuration | ||||||||||||||||||||||||||||
| * already tolerates dropping events on overflow, callers opting in are expected to tolerate | ||||||||||||||||||||||||||||
| * this consistency relaxation as well. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * Example — drop the oldest pending `NewMessageEvent` for `messaging` channels when more than | ||||||||||||||||||||||||||||
| * 100 are queued: | ||||||||||||||||||||||||||||
| * ```kotlin | ||||||||||||||||||||||||||||
| * ChatClientConfig( | ||||||||||||||||||||||||||||
| * messageLimitConfig = MessageLimitConfig( | ||||||||||||||||||||||||||||
| * messageBufferConfig = MessageBufferConfig( | ||||||||||||||||||||||||||||
| * channelTypes = setOf("messaging"), | ||||||||||||||||||||||||||||
| * capacity = 100, | ||||||||||||||||||||||||||||
| * overflow = MessageBufferOverflow.DROP_OLDEST, | ||||||||||||||||||||||||||||
| * ), | ||||||||||||||||||||||||||||
| * ), | ||||||||||||||||||||||||||||
| * ) | ||||||||||||||||||||||||||||
| * ``` | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * @param channelTypes The set of channel types whose `NewMessageEvent`s should be routed through | ||||||||||||||||||||||||||||
| * the bounded buffer. Channel types not in this set continue to use the unbuffered path. When | ||||||||||||||||||||||||||||
| * this set is empty (the default), buffering is disabled entirely and the per-event channel-type | ||||||||||||||||||||||||||||
| * check is skipped. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * @param capacity The maximum number of `NewMessageEvent`s that can be queued in the buffer | ||||||||||||||||||||||||||||
| * while the consumer is busy. Once exceeded, [overflow] decides which event to drop. Defaults to | ||||||||||||||||||||||||||||
| * `Int.MAX_VALUE`, which effectively disables overflow. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * @param overflow The strategy applied when the buffer is full: | ||||||||||||||||||||||||||||
| * - [MessageBufferOverflow.DROP_OLDEST] (default): the oldest queued event is evicted to make | ||||||||||||||||||||||||||||
| * room for the new one. Useful for live channels where freshness matters more than completeness. | ||||||||||||||||||||||||||||
| * - [MessageBufferOverflow.DROP_LATEST]: the newest event is discarded and the queued events are | ||||||||||||||||||||||||||||
| * kept. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| public data class MessageBufferConfig( | ||||||||||||||||||||||||||||
| public val channelTypes: Set<String> = emptySet(), | ||||||||||||||||||||||||||||
| public val capacity: Int = Int.MAX_VALUE, | ||||||||||||||||||||||||||||
| public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST, | ||||||||||||||||||||||||||||
|
Comment on lines
+218
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
rg -nC3 'MessageBufferConfig|capacity|MutableSharedFlow|extraBufferCapacity|onBufferOverflow|require\s*\(' stream-chat-android-client/src/mainRepository: GetStream/stream-chat-android Length of output: 27801 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig + its KDoc
nl -ba "$FILE" | sed -n '150,270p'
FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
nl -ba "$FILE2" | sed -n '140,230p'
# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android Length of output: 116 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig KDoc and data class
cat -n "$FILE" | sed -n '150,260p'
FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
cat -n "$FILE2" | sed -n '150,230p'
# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android Length of output: 9460 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
cat -n "$FILE" | sed -n '150,260p'
FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
cat -n "$FILE2" | sed -n '150,230p'
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android Length of output: 9460 🏁 Script executed: #!/bin/bash
set -euo pipefail
# 1) Look for any require/validation mentioning MessageBufferConfig or capacity
rg -n --hidden --no-ignore-vcs \
"MessageBufferConfig|bufferConfig\.capacity|capacity must|capacity must be|require\s*\(.*capacity|capacity\s*[><=]\s*\d" \
stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S
# 2) Find where MessageBufferConfig is instantiated/propagated
rg -n "MessageBufferConfig\(" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -SRepository: GetStream/stream-chat-android Length of output: 2527 🏁 Script executed: #!/bin/bash
set -euo pipefail
rg -n --hidden --no-ignore-vcs \
"(MessageBufferConfig\s*\.?\s*capacity|bufferConfig\.capacity|capacity must be|require\s*\(.*capacity|capacity\s*[><]=?\s*[0-9]+)" \
. -SRepository: GetStream/stream-chat-android Length of output: 274 🌐 Web query:
💡 Result: In kotlinx.coroutines, MutableSharedFlow(extraBufferCapacity=…) does not have any runtime “negative value” behavior: it validates inputs and throws IllegalArgumentException if extraBufferCapacity is negative. Specifically, the MutableSharedFlow(replay, extraBufferCapacity, …) constructor function contains a check Citations:
Reject non-positive
Suggested guard public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
-)
+) {
+ init {
+ require(capacity > 0) { "capacity must be > 0" }
+ }
+}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Strategy applied when the [MessageBufferConfig] buffer is full. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * Mirrors a subset of [kotlinx.coroutines.channels.BufferOverflow]: the suspending strategy is | ||||||||||||||||||||||||||||
| * intentionally excluded because the SDK emits into the buffer via non-suspending `tryEmit`, | ||||||||||||||||||||||||||||
| * which makes the suspending semantics unreachable. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| public enum class MessageBufferOverflow { | ||||||||||||||||||||||||||||
| /** Evict the oldest queued event to make room for the new one. */ | ||||||||||||||||||||||||||||
| DROP_OLDEST, | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** Discard the newest event and keep the queued events. */ | ||||||||||||||||||||||||||||
| DROP_LATEST, | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep
MessageLimitConfigbinary compatible.Adding
messageBufferConfigto the primary constructor changes the generated public data-class ABI: the existing one-arg constructor disappears and thecopy/componentNsignatures change as well. That will break already-compiled SDK consumers on upgrade, so this needs an additive shape instead of a primary-constructor change.As per coding guidelines, "Favour additive API changes and mark deprecations with clear migration paths; validate public APIs and maintain binary compatibility".
🤖 Prompt for AI Agents