Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c18bbd2
Fix scroll jump when returning to a channel after WS reconnect
VelikovPetar Mar 10, 2026
640982d
Merge branch 'v7' into bug/AND-1113_fix_channel_jumps_after_coming_ba…
VelikovPetar Mar 10, 2026
5f3cd6f
Merge branch 'v7' into bug/AND-1113_fix_channel_jumps_after_coming_ba…
VelikovPetar Mar 10, 2026
8b139a2
Merge branch 'v7' into bug/AND-1113_fix_channel_jumps_after_coming_ba…
VelikovPetar Mar 12, 2026
d24df19
Merge remote-tracking branch 'origin/bug/AND-1113_fix_channel_jumps_a…
VelikovPetar Mar 12, 2026
15e4ce4
Merge branch 'v7' into bug/AND-1113_fix_channel_jumps_after_coming_ba…
VelikovPetar Mar 12, 2026
408babc
test(01-01): add failing stub tests for Message.isLocalOnly() predicate
VelikovPetar Mar 12, 2026
69a6647
test(01-01): add failing stub tests for ChannelStateImpl.setMessagesP…
VelikovPetar Mar 12, 2026
73765bd
feat(01-02): implement Message.isLocalOnly() predicate and enable tests
VelikovPetar Mar 12, 2026
a2f01e5
feat(01-02): add selectLocalOnlyMessagesForChannel to DB layer
VelikovPetar Mar 12, 2026
0e103ed
feat(01-02): add oldestLoadedDate to ChannelEntity and bump DB versio…
VelikovPetar Mar 12, 2026
c103d87
test(01-03): add failing tests for setMessagesPreservingLocalOnly
VelikovPetar Mar 13, 2026
9f58487
feat(01-03): implement setMessagesPreservingLocalOnly on ChannelState…
VelikovPetar Mar 13, 2026
40b910e
feat(01-04): add updateOldestLoadedDateForChannel to channel reposito…
VelikovPetar Mar 13, 2026
165d4e6
feat(01-04): wire setMessagesPreservingLocalOnly into ChannelLogicImp…
VelikovPetar Mar 13, 2026
4f74e21
feat(02-01): add selectOldestLoadedDate query to ChannelDao
VelikovPetar Mar 13, 2026
229fd38
feat(02-01): add selectOldestLoadedDateForChannel to repository layer
VelikovPetar Mar 13, 2026
1551ef2
test(02-02): add failing tests for pagination and reconnect preservation
VelikovPetar Mar 13, 2026
065797e
feat(02-02): wire setMessagesPreservingLocalOnly into pagination bran…
VelikovPetar Mar 13, 2026
e897300
feat(02-02): wire updateDataForChannel reconnect path with preservation
VelikovPetar Mar 13, 2026
f02be55
test(02-03): add PaginationPreservation nested test class
VelikovPetar Mar 13, 2026
353ee5a
test(02-03): add ReconnectPreservation nested test class
VelikovPetar Mar 13, 2026
b09d9fc
fix(phase-2): use upsertMessagesPreservingLocalOnly for pagination br…
VelikovPetar Mar 13, 2026
d0a43f6
Improve handling of pending and local-only messages in ChannelState.
VelikovPetar Mar 16, 2026
c1533ef
Improve handling of pending and local-only messages in ChannelState.
VelikovPetar Mar 16, 2026
7770f8b
Add missing tests.
VelikovPetar Mar 16, 2026
3eabadd
Merge branch 'v7' into bug/handle_local_only_messages
VelikovPetar Mar 16, 2026
217050a
Merge branch 'v7' into bug/handle_local_only_messages
VelikovPetar Mar 16, 2026
9ec95e2
Merge branch 'v7' into bug/handle_local_only_messages
VelikovPetar Mar 16, 2026
4ace31d
Address CodeRabbit review remarks on test fixtures.
VelikovPetar Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3188,6 +3188,7 @@ public final class io/getstream/chat/android/client/internal/offline/repository/
public fun select (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun selectByCidAndUserId (Ljava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun selectBySyncStatus (Lio/getstream/chat/android/models/SyncStatus;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun selectBySyncStatusOrTypeForChannel (Ljava/lang/String;Ljava/util/List;Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun selectByUserId (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun selectChunked (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun selectDraftMessageByCid (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -3514,6 +3515,7 @@ public abstract interface class io/getstream/chat/android/client/persistance/rep
public abstract fun selectDraftMessageByParentId (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun selectDraftMessages (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun selectDraftMessagesByCid (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun selectLocalOnlyMessagesForChannel (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun selectMessage (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun selectMessageBySyncState (Lio/getstream/chat/android/models/SyncStatus;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun selectMessageIdsBySyncState (Lio/getstream/chat/android/models/SyncStatus;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ import io.getstream.chat.android.client.internal.offline.repository.domain.user.
ThreadOrderEntity::class,
DraftMessageEntity::class,
],
version = 101,
version = 102,
exportSchema = false,
)
@TypeConverters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import io.getstream.chat.android.client.api.models.Pagination
import io.getstream.chat.android.client.internal.offline.extensions.launchWithMutex
import io.getstream.chat.android.client.persistance.repository.MessageRepository
import io.getstream.chat.android.client.query.pagination.AnyChannelPaginationRequest
import io.getstream.chat.android.client.utils.message.LocalOnlyMessageTypes
import io.getstream.chat.android.client.utils.message.LocalOnlySyncStatuses
import io.getstream.chat.android.client.utils.message.isDeleted
import io.getstream.chat.android.models.DraftMessage
import io.getstream.chat.android.models.Message
Expand Down Expand Up @@ -262,6 +264,13 @@ internal class DatabaseMessageRepository(
}
}

override suspend fun selectLocalOnlyMessagesForChannel(cid: String): List<Message> =
messageDao.selectBySyncStatusOrTypeForChannel(
cid = cid,
syncStatuses = LocalOnlySyncStatuses.map(SyncStatus::status),
types = LocalOnlyMessageTypes.toList(),
).map { entity -> entity.toMessage() }

private suspend fun selectMessagesEntitiesForChannel(
cid: String,
pagination: AnyChannelPaginationRequest?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,19 @@ internal interface MessageDao {
@Query("DELETE FROM $MESSAGE_ENTITY_TABLE_NAME")
suspend fun deleteAll()

@Query(
"SELECT * FROM $MESSAGE_ENTITY_TABLE_NAME " +
"WHERE cid = :cid " +
"AND (syncStatus IN (:syncStatuses) OR type IN (:types)) " +
"ORDER BY CASE WHEN createdAt IS NULL THEN createdLocallyAt ELSE createdAt END ASC",
)
@Transaction
suspend fun selectBySyncStatusOrTypeForChannel(
cid: String,
syncStatuses: List<Int>,
types: List<String>,
): List<MessageEntity>

private companion object {
private const val SQLITE_MAX_VARIABLE_NUMBER: Int = 999
private const val NO_LIMIT: Int = -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ internal class QueryChannelListenerState(private val logic: LogicRegistry) : Que
) {
logger.d { "[onQueryChannelRequest] cid: $channelType:$channelId, request: $request" }
logic.channel(channelType, channelId).apply {
updateStateFromDatabase(request)
setPaginationDirection(request)
updateStateFromDatabase(request)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import io.getstream.chat.android.client.channel.ChannelMessagesUpdateLogic
import io.getstream.chat.android.client.errors.isPermanent
import io.getstream.chat.android.client.events.ChatEvent
import io.getstream.chat.android.client.extensions.cidToTypeAndId
import io.getstream.chat.android.client.extensions.getCreatedAtOrDefault
import io.getstream.chat.android.client.extensions.getCreatedAtOrNull
import io.getstream.chat.android.client.extensions.internal.NEVER
import io.getstream.chat.android.client.internal.state.model.querychannels.pagination.internal.QueryChannelPaginationRequest
import io.getstream.chat.android.client.internal.state.model.querychannels.pagination.internal.toAnyChannelPaginationRequest
import io.getstream.chat.android.client.internal.state.plugin.state.channel.internal.ChannelStateImpl
Expand Down Expand Up @@ -88,39 +90,36 @@ internal class ChannelLogicImpl(
if (query.isFilteringMessages()) return
// Populate from DB ONLY if loading latest messages
val channel = fetchOfflineChannel(cid, query) ?: return
val localOnlyMessages = repository.selectLocalOnlyMessagesForChannel(cid)
updateDataForChannel(
channel = channel,
messageLimit = query.messagesLimit(),
shouldRefreshMessages = true,
// Note: The following arguments are NOT used. But they are kept for backwards compatibility.
shouldRefreshMessages = query.shouldRefresh,
scrollUpdate = false,
isNotificationUpdate = query.isNotificationUpdate,
isChannelsStateUpdate = true,
)
// Set the currently known oldest message until online data is retrieved
state.paginationManager.setOldestMessage(channel.messages.lastOrNull())
state.setLocalOnlyMessages(localOnlyMessages)
}

override fun setPaginationDirection(query: QueryChannelRequest) {
when {
query.filteringOlderMessages() -> state.setLoadingOlderMessages(true)
query.isFilteringNewerMessages() -> state.setLoadingNewerMessages(true)
}
state.paginationManager.begin(query)
}

override fun onQueryChannelResult(query: QueryChannelRequest, result: Result<Channel>) {
val limit = query.messagesLimit()
val isNotificationUpdate = query.isNotificationUpdate
// Update pagination state only if it's not a notification update and the call was made for fetching messages
// (from LoadNotificationDataWorker) and a limit is set (otherwise we are not loading messages)
if (!isNotificationUpdate && limit != 0) {
state.paginationManager.end(query, result)
}
when (result) {
is Result.Success -> {
val limit = query.messagesLimit()
val channel = result.value
val endReached = limit > channel.messages.size
val isNotificationUpdate = query.isNotificationUpdate

// Update pagination/recovery state only if it's not a notification update
// (from LoadNotificationDataWorker) and a limit is set (otherwise we are not loading messages)
if (!isNotificationUpdate && limit != 0) {
state.setRecoveryNeeded(false)
updatePaginationEnd(query, endReached)
}

// Update channel data
val channelData = channel.toChannelData()
state.updateChannelData {
Expand All @@ -145,18 +144,16 @@ internal class ChannelLogicImpl(
}
// Add pinned messages
state.addPinnedMessages(channel.pinnedMessages)
// Update loading states
state.setLoadingOlderMessages(false)
state.setLoadingNewerMessages(false)
// Reset recovery state
if (!isNotificationUpdate && limit != 0) {
state.setRecoveryNeeded(false)
}
}

is Result.Failure -> {
// Mark the channel as needing recovery if the error is not permanent
val isPermanent = result.value.isPermanent()
state.setRecoveryNeeded(recoveryNeeded = !isPermanent)
// Reset loading states
state.setLoadingOlderMessages(false)
state.setLoadingNewerMessages(false)
}
}
}
Expand All @@ -171,7 +168,6 @@ internal class ChannelLogicImpl(
}

override suspend fun loadAfter(messageId: String, limit: Int): Result<Channel> {
state.setLoadingNewerMessages(true)
val request = QueryChannelPaginationRequest(limit)
.apply {
messageFilterValue = messageId
Expand All @@ -182,7 +178,6 @@ internal class ChannelLogicImpl(
}

override suspend fun loadBefore(messageId: String?, limit: Int): Result<Channel> {
state.setLoadingOlderMessages(true)
val messageId = messageId ?: state.getOldestMessage()?.id
val request = QueryChannelPaginationRequest(limit)
.apply {
Expand Down Expand Up @@ -302,19 +297,42 @@ internal class ChannelLogicImpl(
state.setChannelConfig(channel.config)
// Set pending messages
state.setPendingMessages(channel.pendingMessages.map(PendingMessage::message))
// Reset messages (ensure they are sorted - when coming from DB)
// Update messages based on the relationship between the incoming page and existing state.
if (messageLimit > 0) {
val sortedMessages = withContext(Dispatchers.Default) {
channel.messages.sortedBy { it.getCreatedAtOrNull() }
}
state.setMessages(sortedMessages)
state.setEndOfOlderMessages(channel.messages.size < messageLimit)
val currentMessages = state.messages.value
when {
shouldRefreshMessages || currentMessages.isEmpty() -> {
// Initial load (DB seed or first fetch) or explicit refresh — full replace
state.setMessages(sortedMessages)
state.paginationManager.setEndOfOlderMessages(channel.messages.size < messageLimit)
}
state.insideSearch.value -> {
// User's window was already trimmed away from the latest (insideSearch set by
// trimNewestMessages, or a prior jump-to-message). Stay at current position;
// refresh the "jump to latest" cache with the server's current latest page.
state.upsertCachedLatestMessages(sortedMessages)
}
hasGap(currentMessages, sortedMessages) -> {
// Incoming page is newer than the current window with no overlap. Inserting the
// incoming messages would create a fragmented list. Instead, treat the user's
// position as a mid-page: store the incoming as the "latest" cache and signal the UI.
state.upsertCachedLatestMessages(sortedMessages)
state.setInsideSearch(true)
state.paginationManager.setEndOfNewerMessages(false)
}
else -> {
// Incoming messages are contiguous with (or overlap) the current window.
// Upsert preserves the user's scroll position while adding/updating messages.
state.upsertMessages(sortedMessages)
state.paginationManager.setEndOfOlderMessages(channel.messages.size < messageLimit)
}
}
}
// Add pinned messages
state.addPinnedMessages(channel.pinnedMessages)
// Update loading states
state.setLoadingOlderMessages(false)
state.setLoadingNewerMessages(false)
}

override fun handleEvents(events: List<ChatEvent>) {
Expand All @@ -328,53 +346,28 @@ internal class ChannelLogicImpl(
}

private suspend fun queryChannel(request: WatchChannelRequest): Result<Channel> {
state.paginationManager.begin(request)
val (type, id) = cid.cidToTypeAndId()
return ChatClient.instance()
.queryChannel(type, id, request, skipOnRequest = true)
.await()
}

private fun updatePaginationEnd(query: QueryChannelRequest, endReached: Boolean) {
when {
// Querying the newest messages (no pagination applied)
!query.isFilteringMessages() -> {
state.setEndOfOlderMessages(endReached)
state.setEndOfNewerMessages(true)
}
// Querying messages around a specific message - no way to know if we reached the end
query.isFilteringAroundIdMessages() -> {
state.setEndOfOlderMessages(false)
state.setEndOfNewerMessages(false)
}
// Querying older messages and reached the end
query.filteringOlderMessages() && endReached -> {
state.setEndOfOlderMessages(true)
}
// Querying newer messages and reached the end
query.isFilteringNewerMessages() && endReached -> {
state.setEndOfNewerMessages(true)
}
}
}

private fun updateMessages(query: QueryChannelRequest, channel: Channel) {
when {
!query.isFilteringMessages() -> {
// Loading newest messages (no pagination):
// 1. Clear any cached latest messages (we are replacing the whole list)
// 2. Replace the active messages with the loaded ones
// 3. No pending messages ceiling — we are at the latest messages
state.clearCachedLatestMessages()
state.setMessages(channel.messages)
state.setInsideSearch(false)
state.setNewestLoadedDate(null)
}

query.isFilteringAroundIdMessages() -> {
// Loading messages around a specific message:
// 1. Cache the current messages (for access to latest messages) (unless already inside search)
// 2. Replace the active messages with the loaded ones
// 3. Set ceiling to newest in loaded page — pending messages newer than the page are hidden
if (state.insideSearch.value) {
// We are currently around a message, don't cache the latest messages, just replace the active set
// Otherwise, the cached message set will wrongly hold the previous "around" set, instead of the
Expand All @@ -386,7 +379,6 @@ internal class ChannelLogicImpl(
state.setMessages(channel.messages)
state.setInsideSearch(true)
}
state.setNewestLoadedDate(channel.messages.lastOrNull()?.getCreatedAtOrNull())
}

query.isFilteringNewerMessages() -> {
Expand All @@ -395,13 +387,9 @@ internal class ChannelLogicImpl(
state.trimOldestMessages()
val endReached = query.messagesLimit() > channel.messages.size
if (endReached) {
// Reached the latest messages — remove the ceiling
// Reached the latest messages
state.clearCachedLatestMessages()
state.setInsideSearch(false)
state.setNewestLoadedDate(null)
} else {
// Still paginating toward latest — advance ceiling to include newly loaded messages
state.advanceNewestLoadedDate(channel.messages.lastOrNull()?.getCreatedAtOrNull())
}
}

Expand All @@ -411,10 +399,6 @@ internal class ChannelLogicImpl(
state.trimNewestMessages()
}
}
// Advance the oldest-loaded-date floor. Only queryChannel pagination (this path) should
// set this floor — updateDataForChannel (QueryChannels) must not, otherwise a channel-list
// preview would incorrectly filter out older pending messages.
state.advanceOldestLoadedDate(channel.messages)
// Replace pending messages — server always returns the full latest set (up to 100, ASC).
state.setPendingMessages(channel.pendingMessages.map { it.message })
}
Expand All @@ -428,4 +412,14 @@ internal class ChannelLogicImpl(
// Enrich the channel with messages
return channel.copy(messages = messages)
}

private fun hasGap(currentMessages: List<Message>, incomingMessages: List<Message>): Boolean {
val currentNewest = currentMessages.maxByOrNull { it.getCreatedAtOrDefault(NEVER) }
val incomingOldest = incomingMessages.firstOrNull()
return currentMessages.isNotEmpty() &&
currentNewest != null &&
incomingOldest != null &&
currentMessages.none { it.id == incomingOldest.id } &&
incomingOldest.getCreatedAtOrDefault(NEVER).after(currentNewest.getCreatedAtOrDefault(NEVER))
}
}
Loading
Loading