Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions stream-chat-android-client/api/stream-chat-android-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,42 @@ public final class io/getstream/chat/android/client/api/ChatClientConfig {
public fun toString ()Ljava/lang/String;
}

public final class io/getstream/chat/android/client/api/MessageBufferConfig {
public fun <init> ()V
public fun <init> (Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;)V
public synthetic fun <init> (Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/util/Set;
public final fun component2 ()I
public final fun component3 ()Lio/getstream/chat/android/client/api/MessageBufferOverflow;
public final fun copy (Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;)Lio/getstream/chat/android/client/api/MessageBufferConfig;
public static synthetic fun copy$default (Lio/getstream/chat/android/client/api/MessageBufferConfig;Ljava/util/Set;ILio/getstream/chat/android/client/api/MessageBufferOverflow;ILjava/lang/Object;)Lio/getstream/chat/android/client/api/MessageBufferConfig;
public fun equals (Ljava/lang/Object;)Z
public final fun getCapacity ()I
public final fun getChannelTypes ()Ljava/util/Set;
public final fun getOverflow ()Lio/getstream/chat/android/client/api/MessageBufferOverflow;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class io/getstream/chat/android/client/api/MessageBufferOverflow : java/lang/Enum {
public static final field DROP_LATEST Lio/getstream/chat/android/client/api/MessageBufferOverflow;
public static final field DROP_OLDEST Lio/getstream/chat/android/client/api/MessageBufferOverflow;
public static fun getEntries ()Lkotlin/enums/EnumEntries;
public static fun valueOf (Ljava/lang/String;)Lio/getstream/chat/android/client/api/MessageBufferOverflow;
public static fun values ()[Lio/getstream/chat/android/client/api/MessageBufferOverflow;
}

public final class io/getstream/chat/android/client/api/MessageLimitConfig {
public fun <init> ()V
public fun <init> (Ljava/util/Set;)V
public synthetic fun <init> (Ljava/util/Set;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;)V
public synthetic fun <init> (Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Ljava/util/Set;
public final fun copy (Ljava/util/Set;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
public static synthetic fun copy$default (Lio/getstream/chat/android/client/api/MessageLimitConfig;Ljava/util/Set;ILjava/lang/Object;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
public final fun component2 ()Lio/getstream/chat/android/client/api/MessageBufferConfig;
public final fun copy (Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
public static synthetic fun copy$default (Lio/getstream/chat/android/client/api/MessageLimitConfig;Ljava/util/Set;Lio/getstream/chat/android/client/api/MessageBufferConfig;ILjava/lang/Object;)Lio/getstream/chat/android/client/api/MessageLimitConfig;
public fun equals (Ljava/lang/Object;)Z
public final fun getChannelMessageLimits ()Ljava/util/Set;
public final fun getMessageBufferConfig ()Lio/getstream/chat/android/client/api/MessageBufferConfig;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ public data class ChatClientConfig @JvmOverloads constructor(
* @param channelMessageLimits A set of [ChannelMessageLimit] defining the maximum number of messages to keep in
* memory for different channel types. By default, this is an empty set, meaning no limits are applied and all
* messages are kept in memory. Each channel type can have its own limit configured independently.
*
* @param messageBufferConfig Configuration for bounding the inbound `NewMessageEvent` buffer on selected channel
* types. By default, no buffering is applied — events flow through the unbuffered path. See [MessageBufferConfig]
* for details and trade-offs.
*/
public data class MessageLimitConfig(
public val channelMessageLimits: Set<ChannelMessageLimit> = setOf(),
public val messageBufferConfig: MessageBufferConfig = MessageBufferConfig(),
)
Comment on lines 133 to 136
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Keep MessageLimitConfig binary compatible.

Adding messageBufferConfig to the primary constructor changes the generated public data-class ABI: the existing one-arg constructor disappears and the copy / componentN signatures change as well. That will break already-compiled SDK consumers on upgrade, so this needs an additive shape instead of a primary-constructor change.

As per coding guidelines, "Favour additive API changes and mark deprecations with clear migration paths; validate public APIs and maintain binary compatibility".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`
around lines 133 - 136, Revert MessageLimitConfig to keep the original
primary-constructor shape (single parameter channelMessageLimits) and make
messageBufferConfig an additive property declared in the class body (public val
messageBufferConfig: MessageBufferConfig = MessageBufferConfig()) instead of
adding it to the primary constructor; this preserves the existing generated
one-arg constructor, copy and componentN signatures for binary compatibility
while still exposing the new messageBufferConfig field.


/**
Expand Down Expand Up @@ -158,3 +163,75 @@ public data class ChannelMessageLimit(
public val channelType: String,
public val baseLimit: Int,
)

/**
* Configuration for buffering inbound `NewMessageEvent`s for specific channel types before they
* are dispatched to the sequential event-handling pipeline.
*
* High-traffic channel types (e.g. livestreams) can produce a flood of new-message events that
* arrive faster than they can be processed sequentially. This configuration applies a bounded
* buffer with a configurable overflow strategy (e.g. drop oldest) for `NewMessageEvent`s on the
* configured channel types only. Events for other channel types — and all non-`NewMessageEvent`
* events — continue to flow through the default unbuffered path with `Int.MAX_VALUE` capacity,
* so signal-critical events such as reads, bans or member updates are never dropped.
*
* By default this is a no-op: no channel types are configured, so the buffered code path is not
* active and the SDK behaves exactly as if this configuration did not exist.
*
* **Event ordering caveat.** When buffering is active, `NewMessageEvent`s for opted-in channel
* types flow through a separate buffer from all other events. As a consequence, the relative
* ordering between buffered `NewMessageEvent`s and non-buffered events (e.g. `ReactionNewEvent`,
* `MessageUpdatedEvent`) for the same channel is **not guaranteed** — a reaction added to
* message X may be processed before the `NewMessageEvent` for X. Because this configuration
* already tolerates dropping events on overflow, callers opting in are expected to tolerate
* this consistency relaxation as well.
*
* Example — drop the oldest pending `NewMessageEvent` for `messaging` channels when more than
* 100 are queued:
* ```kotlin
* ChatClientConfig(
* messageLimitConfig = MessageLimitConfig(
* messageBufferConfig = MessageBufferConfig(
* channelTypes = setOf("messaging"),
* capacity = 100,
* overflow = MessageBufferOverflow.DROP_OLDEST,
* ),
* ),
* )
* ```
*
* @param channelTypes The set of channel types whose `NewMessageEvent`s should be routed through
* the bounded buffer. Channel types not in this set continue to use the unbuffered path. When
* this set is empty (the default), buffering is disabled entirely and the per-event channel-type
* check is skipped.
*
* @param capacity The maximum number of `NewMessageEvent`s that can be queued in the buffer
* while the consumer is busy. Once exceeded, [overflow] decides which event to drop. Defaults to
* `Int.MAX_VALUE`, which effectively disables overflow.
*
* @param overflow The strategy applied when the buffer is full:
* - [MessageBufferOverflow.DROP_OLDEST] (default): the oldest queued event is evicted to make
* room for the new one. Useful for live channels where freshness matters more than completeness.
* - [MessageBufferOverflow.DROP_LATEST]: the newest event is discarded and the queued events are
* kept.
*/
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
Comment on lines +218 to +221
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -nC3 'MessageBufferConfig|capacity|MutableSharedFlow|extraBufferCapacity|onBufferOverflow|require\s*\(' stream-chat-android-client/src/main

Repository: GetStream/stream-chat-android

Length of output: 27801


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig + its KDoc
nl -ba "$FILE" | sed -n '150,270p'

FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
nl -ba "$FILE2" | sed -n '140,230p'

# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 116


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
# show MessageBufferConfig KDoc and data class
cat -n "$FILE" | sed -n '150,260p'

FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
# show where capacity is used
cat -n "$FILE2" | sed -n '150,230p'

# find all usages of bufferConfig.capacity
rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 9460


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt"
cat -n "$FILE" | sed -n '150,260p'

FILE2="stream-chat-android-client/src/main/java/io/getstream/chat/android/client/internal/state/event/handler/internal/EventHandlerSequential.kt"
cat -n "$FILE2" | sed -n '150,230p'

rg -n "bufferConfig\.capacity" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 9460


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Look for any require/validation mentioning MessageBufferConfig or capacity
rg -n --hidden --no-ignore-vcs \
  "MessageBufferConfig|bufferConfig\.capacity|capacity must|capacity must be|require\s*\(.*capacity|capacity\s*[><=]\s*\d" \
  stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

# 2) Find where MessageBufferConfig is instantiated/propagated
rg -n "MessageBufferConfig\(" stream-chat-android-client/src/main/java/io/getstream/chat/android/client -S

Repository: GetStream/stream-chat-android

Length of output: 2527


🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n --hidden --no-ignore-vcs \
  "(MessageBufferConfig\s*\.?\s*capacity|bufferConfig\.capacity|capacity must be|require\s*\(.*capacity|capacity\s*[><]=?\s*[0-9]+)" \
  . -S

Repository: GetStream/stream-chat-android

Length of output: 274


🌐 Web query:

Kotlinx coroutines MutableSharedFlow extraBufferCapacity negative value behavior

💡 Result:

In kotlinx.coroutines, MutableSharedFlow(extraBufferCapacity=…) does not have any runtime “negative value” behavior: it validates inputs and throws IllegalArgumentException if extraBufferCapacity is negative. Specifically, the MutableSharedFlow(replay, extraBufferCapacity, …) constructor function contains a check require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }, so passing a negative extraBufferCapacity will fail immediately with IllegalArgumentException (before any flow emissions). [1] Related notes: - The documentation for MutableSharedFlow also states that extraBufferCapacity “cannot be negative”. [2] - There is additional validation logic for the combination of (replay, extraBufferCapacity, onBufferOverflow): if you choose a non-default onBufferOverflow strategy other than BufferOverflow.SUSPEND, then replay or extraBufferCapacity must be > 0; otherwise it will also throw. [1][2] So for your query “MutableSharedFlow extraBufferCapacity negative value behavior”: the behavior is “constructor rejects the value and throws IllegalArgumentException,” not a special buffering/overflow semantics. [1][2]

Citations:


Reject non-positive capacity values at the API boundary.

MessageBufferConfig.capacity is passed directly as extraBufferCapacity when constructing the buffered MutableSharedFlow with onBufferOverflow = DROP_OLDEST/DROP_LATEST. MutableSharedFlow rejects negative values and also requires extraBufferCapacity > 0 for DROP_* (replay defaults to 0), so capacity <= 0 will surface as an IllegalArgumentException at runtime when buffering becomes active.

Suggested guard
 public data class MessageBufferConfig(
     public val channelTypes: Set<String> = emptySet(),
     public val capacity: Int = Int.MAX_VALUE,
     public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
-) 
+) {
+    init {
+        require(capacity > 0) { "capacity must be > 0" }
+    }
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
public data class MessageBufferConfig(
public val channelTypes: Set<String> = emptySet(),
public val capacity: Int = Int.MAX_VALUE,
public val overflow: MessageBufferOverflow = MessageBufferOverflow.DROP_OLDEST,
) {
init {
require(capacity > 0) { "capacity must be > 0" }
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@stream-chat-android-client/src/main/java/io/getstream/chat/android/client/api/ChatClientConfig.kt`
around lines 218 - 221, Add an API-level guard to reject non-positive buffer
sizes: in the MessageBufferConfig data class (symbol MessageBufferConfig)
validate the capacity property on construction and throw an
IllegalArgumentException if capacity <= 0 (except when using sentinel values you
intentionally allow), since capacity is used as extraBufferCapacity for
MutableSharedFlow with onBufferOverflow (MessageBufferOverflow) and
MutableSharedFlow requires extraBufferCapacity > 0 for DROP_* policies; ensure
the error message clearly names capacity and MessageBufferConfig so callers see
what's wrong.

)

/**
* Strategy applied when the [MessageBufferConfig] buffer is full.
*
* Mirrors a subset of [kotlinx.coroutines.channels.BufferOverflow]: the suspending strategy is
* intentionally excluded because the SDK emits into the buffer via non-suspending `tryEmit`,
* which makes the suspending semantics unreachable.
*/
public enum class MessageBufferOverflow {
/** Evict the oldest queued event to make room for the new one. */
DROP_OLDEST,

/** Discard the newest event and keep the queued events. */
DROP_LATEST,
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.getstream.chat.android.client.internal.state.event.handler.internal

import androidx.annotation.VisibleForTesting
import io.getstream.chat.android.client.ChatEventListener
import io.getstream.chat.android.client.api.MessageBufferConfig
import io.getstream.chat.android.client.api.MessageBufferOverflow
import io.getstream.chat.android.client.api.event.EventHandlingResult
import io.getstream.chat.android.client.api.state.StateRegistry
import io.getstream.chat.android.client.events.AnswerCastedEvent
Expand Down Expand Up @@ -130,6 +132,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.StateFlow
Expand Down Expand Up @@ -159,6 +162,7 @@ internal class EventHandlerSequential(
private val repos: RepositoryFacade,
private val sideEffect: suspend () -> Unit,
private val syncedEvents: Flow<List<ChatEvent>>,
private val bufferConfig: MessageBufferConfig,
scope: CoroutineScope,
) : EventHandler {

Expand All @@ -169,12 +173,61 @@ internal class EventHandlerSequential(

private val mutex = Mutex()
private val socketEvents = MutableSharedFlow<ChatEvent>(extraBufferCapacity = Int.MAX_VALUE)

/**
* Secondary flow used only when [bufferConfig] opts specific channel types into a bounded buffer.
* Allocated lazily so the default configuration pays no cost for it.
*/
private val bufferedNewMessageEvents: MutableSharedFlow<ChatEvent> by lazy {
MutableSharedFlow(
extraBufferCapacity = bufferConfig.capacity,
onBufferOverflow = when (bufferConfig.overflow) {
MessageBufferOverflow.DROP_OLDEST -> BufferOverflow.DROP_OLDEST
MessageBufferOverflow.DROP_LATEST -> BufferOverflow.DROP_LATEST
},
)
}
private val socketEventCollector = SocketEventCollector(scope) { batchEvent ->
handleBatchEvent(batchEvent)
}

private var eventsDisposable: Disposable = EMPTY_DISPOSABLE

/**
* Default listener — emits every event into the unbuffered [socketEvents] flow without
* inspecting [bufferConfig]. Used whenever no channel types are opted in for buffering.
*/
private val defaultSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
logEmitOutcome(event, socketEvents.tryEmit(event))
}

/**
* Listener used only when [bufferConfig] opts specific channel types into a bounded buffer.
* Routes matching [NewMessageEvent]s to [bufferedNewMessageEvents] and everything else to
* [socketEvents].
*/
private val bufferedSocketEventListener: ChatEventListener<ChatEvent> = ChatEventListener { event ->
val target = if (event is NewMessageEvent && event.channelType in bufferConfig.channelTypes) {
bufferedNewMessageEvents
} else {
socketEvents
}
logEmitOutcome(event, target.tryEmit(event))
}

private fun logEmitOutcome(event: ChatEvent, emitted: Boolean) {
if (emitted) {
val cCount = collectedCount.get()
val eCount = emittedCount.incrementAndGet()
val ratio = eCount.toDouble() / cCount.toDouble()
StreamLog.v(TAG_SOCKET) {
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
}
} else {
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
}
}

init {
logger.d { "<init> no args" }
}
Expand All @@ -201,26 +254,23 @@ internal class EventHandlerSequential(
)
}
}
scope.launch {
socketEvents.collect { event ->
collectedCount.incrementAndGet()
initJob.join()
sideEffect()
socketEventCollector.collect(event)
}
val collectSocketEvent: suspend (ChatEvent) -> Unit = { event ->
collectedCount.incrementAndGet()
initJob.join()
sideEffect()
socketEventCollector.collect(event)
}
eventsDisposable = subscribeForEvents { event ->
if (socketEvents.tryEmit(event)) {
val cCount = collectedCount.get()
val eCount = emittedCount.incrementAndGet()
val ratio = eCount.toDouble() / cCount.toDouble()
StreamLog.v(TAG_SOCKET) {
"[onSocketEventReceived] event.type: ${event.realType}; $eCount => $cCount ($ratio)"
}
} else {
StreamLog.e(TAG_SOCKET) { "[onSocketEventReceived] failed to emit socket event: $event" }
}
scope.launch { socketEvents.collect(collectSocketEvent) }
val isBufferingEnabled = bufferConfig.channelTypes.isNotEmpty()
if (isBufferingEnabled) {
scope.launch { bufferedNewMessageEvents.collect(collectSocketEvent) }
}
val activeListener = if (isBufferingEnabled) {
bufferedSocketEventListener
} else {
defaultSocketEventListener
}
eventsDisposable = subscribeForEvents(activeListener)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.getstream.chat.android.client.internal.state.plugin.factory
import android.content.Context
import io.getstream.chat.android.client.ChatClient
import io.getstream.chat.android.client.api.ChatClientConfig
import io.getstream.chat.android.client.api.MessageBufferConfig
import io.getstream.chat.android.client.api.state.StateRegistry
import io.getstream.chat.android.client.events.ChatEvent
import io.getstream.chat.android.client.internal.state.errorhandler.StateErrorHandlerFactory
Expand Down Expand Up @@ -154,6 +155,7 @@ public class StreamStatePluginFactory(
repos = repositoryFacade,
syncedEvents = syncManager.syncedEvents,
sideEffect = syncManager::awaitSyncing,
bufferConfig = config.messageLimitConfig.messageBufferConfig,
)

val stateErrorHandlerFactory = StateErrorHandlerFactory(
Expand Down Expand Up @@ -189,6 +191,7 @@ public class StreamStatePluginFactory(
repos: RepositoryFacade,
sideEffect: suspend () -> Unit,
syncedEvents: Flow<List<ChatEvent>>,
bufferConfig: MessageBufferConfig,
): EventHandler {
return EventHandlerSequential(
scope = scope,
Expand All @@ -201,6 +204,7 @@ public class StreamStatePluginFactory(
repos = repos,
syncedEvents = syncedEvents,
sideEffect = sideEffect,
bufferConfig = bufferConfig,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.getstream.chat.android.client.internal.state.event

import io.getstream.chat.android.client.api.MessageBufferConfig
import io.getstream.chat.android.client.events.ChatEvent
import io.getstream.chat.android.client.internal.state.event.handler.internal.EventHandler
import io.getstream.chat.android.client.internal.state.event.handler.internal.EventHandlerSequential
Expand Down Expand Up @@ -144,6 +145,7 @@ internal class TotalUnreadCountTest {
repos = repos,
sideEffect = sideEffect,
syncedEvents = syncedEvents,
bufferConfig = MessageBufferConfig(),
)

fun givenMockedRepositories(): Fixture {
Expand Down
Loading
Loading