Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import io.getstream.chat.android.client.extensions.ATTACHMENT_TYPE_IMAGE
import io.getstream.chat.android.client.extensions.cidToTypeAndId
import io.getstream.chat.android.client.extensions.extractBaseUrl
import io.getstream.chat.android.client.extensions.getCreatedAtOrNull
import io.getstream.chat.android.client.extensions.internal.isLaterThanDays
import io.getstream.chat.android.client.header.VersionPrefixHeader
import io.getstream.chat.android.client.helpers.AppSettingManager
Expand Down Expand Up @@ -157,6 +158,7 @@
import io.getstream.chat.android.client.user.storage.UserCredentialStorage
import io.getstream.chat.android.client.utils.ProgressCallback
import io.getstream.chat.android.client.utils.TokenUtils
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.client.utils.mergePartially
import io.getstream.chat.android.client.utils.message.ensureId
import io.getstream.chat.android.client.utils.observable.ChatEventsObservable
Expand Down Expand Up @@ -286,6 +288,8 @@
@InternalStreamChatApi
public val audioPlayer: AudioPlayer,
private val now: () -> Date = ::Date,
@InternalStreamChatApi
public val serverClockOffset: ServerClockOffset,
private val repository: ChatClientRepository,
private val messageReceiptReporter: MessageReceiptReporter,
internal val messageReceiptManager: MessageReceiptManager,
Expand Down Expand Up @@ -2588,16 +2592,34 @@

/**
* Ensure the message has a [Message.createdLocallyAt] timestamp.
* If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and [now].
* This ensures that the message appears in the correct order in the channel.
* If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and the
* estimated server time. Using estimated server time (instead of raw local clock) prevents
* cross-user ordering issues when the device clock is skewed.
*/
private suspend fun Message.ensureCreatedLocallyAt(cid: String): Message {
val lastMessageAt = repositoryFacade.selectChannel(cid = cid)?.lastMessageAt
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
Date(it.time + 1)
val parentId = this.parentId
if (parentId != null) {

Check warning on line 2601 in stream-chat-android-client/src/main/java/io/getstream/chat/android/client/ChatClient.kt

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Move "return" statements from all branches before "if" statement.

See more on https://sonarcloud.io/project/issues?id=GetStream_stream-chat-android&issues=AZzCutSGQz_sVbjk2UkD&open=AZzCutSGQz_sVbjk2UkD&pullRequest=6199
// Thread reply
val lastMessage = repositoryFacade.selectMessagesForThread(parentId, limit = 1).lastOrNull()
val lastMessageAt = lastMessage?.getCreatedAtOrNull()
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
Date(it.time + 1)
}
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime())
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
} else {
// Regular message
val (type, id) = cid.cidToTypeAndId()
// Fetch channel lastMessageAt from state, fallback to offline storage
val channelState = logicRegistry?.channelStateLogic(type, id)?.listenForChannelState()
val lastMessageAt = channelState?.channelData?.value?.lastMessageAt
?: repositoryFacade.selectChannel(cid = cid)?.lastMessageAt
val lastMessageAtPlusOneMillisecond = lastMessageAt?.let {
Date(it.time + 1)
}
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime())
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
}
val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, now())
return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt)
}

/**
Expand Down Expand Up @@ -5037,6 +5059,8 @@
warmUpReflection()
}

val serverClockOffset = ServerClockOffset()

val module =
ChatModule(
appContext = appContext,
Expand All @@ -5055,6 +5079,7 @@
lifecycle = lifecycle,
appName = this.appName,
appVersion = this.appVersion,
serverClockOffset = serverClockOffset,
)

val api = module.api()
Expand Down Expand Up @@ -5091,6 +5116,7 @@
retryPolicy = retryPolicy,
appSettingsManager = appSettingsManager,
chatSocket = module.chatSocket,
serverClockOffset = serverClockOffset,
pluginFactories = pluginFactories,
repositoryFactoryProvider = repositoryFactoryProvider
?: pluginFactories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import io.getstream.chat.android.client.uploader.FileUploader
import io.getstream.chat.android.client.uploader.StreamFileUploader
import io.getstream.chat.android.client.user.CurrentUserFetcher
import io.getstream.chat.android.client.utils.HeadersUtil
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.models.UserId
import io.getstream.log.StreamLog
import okhttp3.Interceptor
Expand Down Expand Up @@ -116,6 +117,7 @@ import java.util.concurrent.TimeUnit
* @param lifecycle Host [Lifecycle] used to observe app foreground/background and manage socket behavior.
* @param appName Optional app name added to default headers for tracking.
* @param appVersion Optional app version added to default headers for tracking.
* @param serverClockOffset Shared clock-offset tracker used by the socket layer for time synchronisation.
*/
@Suppress("TooManyFunctions")
internal class ChatModule
Expand All @@ -137,6 +139,7 @@ constructor(
private val lifecycle: Lifecycle,
private val appName: String?,
private val appVersion: String?,
private val serverClockOffset: ServerClockOffset,
) {

private val headersUtil = HeadersUtil(appContext, appName, appVersion)
Expand Down Expand Up @@ -311,6 +314,7 @@ constructor(
lifecycleObserver,
networkStateProvider,
clientDebugger,
serverClockOffset,
)

private fun buildApi(chatConfig: ChatClientConfig): ChatApi = ProxyChatApi(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.getstream.chat.android.client.network.NetworkStateProvider
import io.getstream.chat.android.client.scope.UserScope
import io.getstream.chat.android.client.socket.ChatSocketStateService.State
import io.getstream.chat.android.client.token.TokenManager
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.core.internal.coroutines.DispatcherProvider
import io.getstream.chat.android.models.User
import io.getstream.log.taggedLogger
Expand All @@ -52,6 +53,7 @@ internal open class ChatSocket(
private val lifecycleObserver: StreamLifecycleObserver,
private val networkStateProvider: NetworkStateProvider,
private val clientDebugger: ChatClientDebugger? = null,
private val serverClockOffset: ServerClockOffset,
) {
private var streamWebSocket: StreamWebSocket? = null
private val logger by taggedLogger(TAG)
Expand All @@ -61,7 +63,13 @@ internal open class ChatSocket(
private var socketStateObserverJob: Job? = null
private val healthMonitor = HealthMonitor(
userScope = userScope,
checkCallback = { (chatSocketStateService.currentState as? State.Connected)?.event?.let(::sendEvent) },
checkCallback = {
(chatSocketStateService.currentState as? State.Connected)?.event?.let {
if (sendEvent(it)) {
serverClockOffset.onHealthCheckSent()
}
}
},
reconnectCallback = { chatSocketStateService.onWebSocketEventLost() },
)
private val lifecycleHandler = object : LifecycleHandler {
Expand All @@ -84,6 +92,7 @@ internal open class ChatSocket(
socketListenerJob?.cancel()
when (networkStateProvider.isConnected()) {
true -> {
serverClockOffset.onConnectionStarted()
streamWebSocket = socketFactory.createSocket(connectionConf).apply {
socketListenerJob = listen().onEach {
when (it) {
Expand Down Expand Up @@ -194,8 +203,14 @@ internal open class ChatSocket(

private suspend fun handleEvent(chatEvent: ChatEvent) {
when (chatEvent) {
is ConnectedEvent -> chatSocketStateService.onConnectionEstablished(chatEvent)
is HealthEvent -> healthMonitor.ack()
is ConnectedEvent -> {
serverClockOffset.onConnected(chatEvent.createdAt)
chatSocketStateService.onConnectionEstablished(chatEvent)
}
is HealthEvent -> {
serverClockOffset.onHealthCheck(chatEvent.createdAt)
healthMonitor.ack()
}
else -> callListeners { listener -> listener.onEvent(chatEvent) }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright (c) 2014-2026 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-chat-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.getstream.chat.android.client.utils.internal

import io.getstream.chat.android.client.events.ConnectedEvent
import io.getstream.chat.android.client.events.HealthEvent
import io.getstream.chat.android.core.internal.InternalStreamChatApi
import java.util.Date

/**
* Tracks the offset between the local device clock and the server clock using
* NTP-style estimation from WebSocket health check round-trips.
*
* The algorithm keeps only the sample with the lowest observed RTT, since a
* smaller round-trip means less room for network asymmetry to distort the
* measurement. Under the assumption that clock skew is constant for the
* duration of a session, the estimate monotonically improves over time.
*
* Thread-safe: single-field writes use [Volatile] for visibility; compound
* read-modify-write sequences are guarded by [lock] for atomicity.
*
* @param localTimeMs Clock source for the local device time (injectable for tests).
* @param maxRttMs Upper bound on plausible RTT. Samples exceeding this are
* discarded as stale or mismatched. Defaults to the health check cycle
* interval (MONITOR_INTERVAL + HEALTH_CHECK_INTERVAL = 11 000 ms).
* @param maxOffsetMs Upper bound on the absolute value of the computed clock offset.
* If the derived offset exceeds this threshold the sample is considered unreliable
* (e.g. a stale / static server timestamp in a test environment) and the offset is
* reset to zero so that [estimatedServerTime] falls back to the raw local time.
* Defaults to 1 hour, which is already far beyond any real-world NTP drift.
*/
@InternalStreamChatApi
public class ServerClockOffset(
private val localTimeMs: () -> Long = { System.currentTimeMillis() },
private val maxRttMs: Long = DEFAULT_MAX_RTT_MS,
private val maxOffsetMs: Long = DEFAULT_MAX_OFFSET_MS,
) {

private val lock = Any()

@Volatile
private var offsetMs: Long = 0L

@Volatile
private var bestRttMs: Long = Long.MAX_VALUE

@Volatile
private var healthCheckSentAtMs: Long = 0L

@Volatile
private var connectionStartedAtMs: Long = 0L

/**
* Record the local time immediately before starting a WebSocket connection.
* When the next [ConnectedEvent] arrives, [onConnected] will pair with this
* timestamp to compute the offset using the NTP midpoint formula.
*/
internal fun onConnectionStarted() {
connectionStartedAtMs = localTimeMs()
}

/**
* Record the local time immediately before sending a health check echo.
* The next [onHealthCheck] call will pair with this timestamp to compute RTT.
*/
internal fun onHealthCheckSent() {
healthCheckSentAtMs = localTimeMs()
}

/**
* Calibration from a [ConnectedEvent].
*
* If [onConnectionStarted] was called before this connection (e.g. right before
* opening the WebSocket), uses the NTP midpoint of (connectionStartedAt, receivedAt)
* and serverTime for a more accurate offset. Otherwise falls back to a naive
* `localTime - serverTime` estimate.
*
* Resets health check state, since a new connection means any in-flight health
* check from the previous connection is stale.
*/
internal fun onConnected(serverTime: Date) {
synchronized(lock) {
bestRttMs = Long.MAX_VALUE
healthCheckSentAtMs = 0L
offsetMs = 0L

val receivedAtMs = localTimeMs()
val startedAtMs = connectionStartedAtMs
connectionStartedAtMs = 0L

if (startedAtMs > 0L) {
val rtt = receivedAtMs - startedAtMs
if (rtt in 1..maxRttMs) {
acceptOffset((startedAtMs + receivedAtMs) / 2 - serverTime.time)
bestRttMs = rtt
return
}
}
acceptOffset(receivedAtMs - serverTime.time)
}
}

/**
* Refine the offset using a [HealthEvent] paired with [onHealthCheckSent].
*
* Computes RTT from the stored send time and the current receive time,
* then applies the NTP midpoint formula:
* ```
* offset = (sentAt + receivedAt) / 2 - serverTime
* ```
*
* The sample is accepted only if:
* - There is a pending [onHealthCheckSent] timestamp.
* - RTT is positive (guards against clock anomalies).
* - RTT is below [maxRttMs] (rejects stale / mismatched pairs).
* - RTT is lower than any previous sample (min-RTT selection).
*/
internal fun onHealthCheck(serverTime: Date) {
synchronized(lock) {
val sentAtMs = healthCheckSentAtMs
if (sentAtMs <= 0L) return
healthCheckSentAtMs = 0L

val receivedAtMs = localTimeMs()
val rtt = receivedAtMs - sentAtMs
if (rtt !in 1..maxRttMs) return

if (rtt < bestRttMs) {
bestRttMs = rtt
acceptOffset((sentAtMs + receivedAtMs) / 2 - serverTime.time)
}
}
}

/**
* Returns the current time adjusted to the server timescale.
*
* Before the first [onConnected] call, this returns the raw local time
* (offset = 0).
*/
@InternalStreamChatApi
public fun estimatedServerTime(): Date =
Date(localTimeMs() - offsetMs)

/**
* Accepts [candidate] as the new [offsetMs] only when its absolute value is within
* [maxOffsetMs]. Offsets that are implausibly large (e.g. produced by a stale or
* static server timestamp) are silently discarded and [offsetMs] is left unchanged.
*
* Note: callers that want a rejected offset to reset to zero (e.g. [onConnected])
* should set [offsetMs] = 0 before calling this function.
*/
private fun acceptOffset(candidate: Long) {
if (kotlin.math.abs(candidate) <= maxOffsetMs) {
offsetMs = candidate
}
}

internal companion object {
internal const val DEFAULT_MAX_RTT_MS = 11_000L
internal const val DEFAULT_MAX_OFFSET_MS = 3_600_000L // 1 hour
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import io.getstream.chat.android.client.token.FakeTokenManager
import io.getstream.chat.android.client.user.CredentialConfig
import io.getstream.chat.android.client.user.storage.UserCredentialStorage
import io.getstream.chat.android.client.utils.TokenUtils
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.models.ConnectionData
import io.getstream.chat.android.models.EventType
import io.getstream.chat.android.models.GuestUser
Expand Down Expand Up @@ -126,6 +127,7 @@ internal class ChatClientConnectionTests {
retryPolicy = mock(),
appSettingsManager = mock(),
chatSocket = fakeChatSocket,
serverClockOffset = ServerClockOffset(),
pluginFactories = emptyList(),
repositoryFactoryProvider = NoOpRepositoryFactory.Provider,
mutableClientState = mutableClientState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import io.getstream.chat.android.client.scope.UserTestScope
import io.getstream.chat.android.client.socket.FakeChatSocket
import io.getstream.chat.android.client.token.FakeTokenManager
import io.getstream.chat.android.client.utils.TokenUtils
import io.getstream.chat.android.client.utils.internal.ServerClockOffset
import io.getstream.chat.android.client.utils.retry.NoRetryPolicy
import io.getstream.chat.android.models.ConnectionState
import io.getstream.chat.android.models.EventType
Expand Down Expand Up @@ -138,6 +139,7 @@ internal class ChatClientTest {
retryPolicy = NoRetryPolicy(),
appSettingsManager = mock(),
chatSocket = fakeChatSocket,
serverClockOffset = ServerClockOffset(),
pluginFactories = emptyList(),
mutableClientState = Mother.mockedClientState(),
repositoryFactoryProvider = NoOpRepositoryFactory.Provider,
Expand Down
Loading
Loading