Skip to content

Commit ae545fd

Browse files
Remove eager full sync from RemoteEventsList
The constructor was firing a doFullSync() that paginated through ALL events (limit=100 per page) on every instantiation — even when no caller ever used the cache-backed methods (getEvents, getEvent, length, asyncIterator). The only production consumer (the conversations UI) exclusively uses .search() which hits the API directly, so the sync was pure waste. Changes: - Remove doFullSync(), syncPromise, ensureSynced(), and the AsyncLock from RemoteEventsList - Make getEvents() fetch from the server on demand instead of reading from a pre-populated cache, merging in any WebSocket-delivered events not yet persisted server-side - addEvent() still appends to the local cache for WebSocket events Co-authored-by: openhands <openhands@all-hands.dev>
1 parent 22efa82 commit ae545fd

1 file changed

Lines changed: 44 additions & 96 deletions

File tree

src/events/remote-events-list.ts

Lines changed: 44 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
/**
2-
* Remote events list implementation with caching and synchronization
2+
* Remote events list — thin wrapper around the events search API.
3+
*
4+
* Events received via WebSocket are kept in a local cache so callers can
5+
* iterate them without extra network calls. For historical events use
6+
* `search()` or `getEvents()` which hit the server on demand.
37
*/
48

59
import { HttpClient } from '../client/http-client';
610
import { Event, ConversationCallbackType } from '../types/base';
7-
// import { EventSortOrder } from '../types/base'; // Unused for now
811
import { EventPage } from '../types/base';
912

1013
/**
@@ -34,59 +37,15 @@ export class RemoteEventsList {
3437
private conversationId: string;
3538
private cachedEvents: Event[] = [];
3639
private cachedEventIds = new Set<string>();
37-
private lock = new AsyncLock();
38-
private syncPromise: Promise<void>;
3940

4041
constructor(client: HttpClient, conversationId: string) {
4142
this.client = client;
4243
this.conversationId = conversationId;
43-
// Perform initial sync
44-
this.syncPromise = this.doFullSync();
45-
}
46-
47-
async ensureSynced(): Promise<void> {
48-
await this.syncPromise;
49-
}
50-
51-
private async doFullSync(): Promise<void> {
52-
console.debug(`Performing full sync for conversation ${this.conversationId}`);
53-
54-
const events: Event[] = [];
55-
let pageId: string | undefined;
56-
57-
// eslint-disable-next-line no-constant-condition
58-
while (true) {
59-
const params: any = { limit: 100 };
60-
if (pageId) {
61-
params.page_id = pageId;
62-
}
63-
64-
const response = await this.client.get<EventPage>(
65-
`/api/conversations/${this.conversationId}/events/search`,
66-
{ params }
67-
);
68-
69-
const data = response.data;
70-
events.push(...data.items);
71-
72-
if (!data.next_page_id) {
73-
break;
74-
}
75-
pageId = data.next_page_id;
76-
}
77-
78-
await this.lock.acquire(async () => {
79-
this.cachedEvents = events;
80-
this.cachedEventIds.clear();
81-
events.forEach((e) => this.cachedEventIds.add(e.id));
82-
});
83-
84-
console.debug(`Full sync completed, ${events.length} events cached`);
8544
}
8645

8746
/**
8847
* Search events with optional filters.
89-
* This method queries the server directly and does not use the cache.
48+
* Queries the server directly.
9049
*/
9150
async search(options: EventSearchOptions = {}): Promise<EventPage> {
9251
const params: any = {
@@ -132,14 +91,10 @@ export class RemoteEventsList {
13291
}
13392

13493
async addEvent(event: Event): Promise<void> {
135-
await this.lock.acquire(async () => {
136-
// Check if event already exists to avoid duplicates
137-
if (!this.cachedEventIds.has(event.id)) {
138-
this.cachedEvents.push(event);
139-
this.cachedEventIds.add(event.id);
140-
console.debug(`Added event ${event.id} to local cache`);
141-
}
142-
});
94+
if (!this.cachedEventIds.has(event.id)) {
95+
this.cachedEvents.push(event);
96+
this.cachedEventIds.add(event.id);
97+
}
14398
}
14499

145100
// Alias for compatibility with EventLog interface
@@ -156,21 +111,46 @@ export class RemoteEventsList {
156111
}
157112

158113
async length(): Promise<number> {
159-
return await this.lock.acquire(async () => this.cachedEvents.length);
114+
return this.cachedEvents.length;
160115
}
161116

162117
async getEvent(index: number): Promise<Event | undefined> {
163-
return await this.lock.acquire(async () => this.cachedEvents[index]);
118+
return this.cachedEvents[index];
164119
}
165120

121+
/**
122+
* Fetch all events from the server, merged with any locally cached
123+
* events received via WebSocket.
124+
*/
166125
async getEvents(start?: number, end?: number): Promise<Event[]> {
167-
await this.ensureSynced();
168-
return await this.lock.acquire(async () => {
169-
if (start === undefined && end === undefined) {
170-
return [...this.cachedEvents];
171-
}
172-
return this.cachedEvents.slice(start, end);
173-
});
126+
const remote: Event[] = [];
127+
let pageId: string | undefined;
128+
129+
// eslint-disable-next-line no-constant-condition
130+
while (true) {
131+
const params: any = { limit: 100 };
132+
if (pageId) params.page_id = pageId;
133+
134+
const response = await this.client.get<EventPage>(
135+
`/api/conversations/${this.conversationId}/events/search`,
136+
{ params }
137+
);
138+
139+
const data = response.data;
140+
remote.push(...data.items);
141+
142+
if (!data.next_page_id) break;
143+
pageId = data.next_page_id;
144+
}
145+
146+
// Merge: remote events first, then any cached events not yet on the server
147+
const remoteIds = new Set(remote.map((e) => e.id));
148+
const merged = [...remote, ...this.cachedEvents.filter((e) => !remoteIds.has(e.id))];
149+
150+
if (start === undefined && end === undefined) {
151+
return merged;
152+
}
153+
return merged.slice(start, end);
174154
}
175155

176156
async *[Symbol.asyncIterator](): AsyncIterableIterator<Event> {
@@ -180,35 +160,3 @@ export class RemoteEventsList {
180160
}
181161
}
182162
}
183-
184-
// Simple async lock implementation
185-
class AsyncLock {
186-
private locked = false;
187-
private queue: Array<() => void> = [];
188-
189-
async acquire<T>(fn: () => Promise<T> | T): Promise<T> {
190-
return new Promise((resolve, reject) => {
191-
const execute = async () => {
192-
try {
193-
const result = await fn();
194-
resolve(result);
195-
} catch (error) {
196-
reject(error);
197-
} finally {
198-
this.locked = false;
199-
const next = this.queue.shift();
200-
if (next) {
201-
next();
202-
}
203-
}
204-
};
205-
206-
if (this.locked) {
207-
this.queue.push(execute);
208-
} else {
209-
this.locked = true;
210-
execute();
211-
}
212-
});
213-
}
214-
}

0 commit comments

Comments
 (0)