diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 680b96d8a17..2c922ca1d8f 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -5,6 +5,13 @@ import { sleep } from '@sim/utils/helpers' import { generateId } from '@sim/utils/id' import { useQueryClient } from '@tanstack/react-query' import { usePathname, useRouter } from 'next/navigation' +import { requestJson } from '@/lib/api/client/request' +import { + addMothershipChatResourceContract, + removeMothershipChatResourceContract, + reorderMothershipChatResourcesContract, +} from '@/lib/api/contracts/mothership-tasks' +import { cancelWorkflowExecutionContract } from '@/lib/api/contracts/workflows' import { getMothershipAttachmentPreviewUrl } from '@/lib/copilot/chat/attachment-preview' import { toDisplayMessage } from '@/lib/copilot/chat/display-message' import { getLiveAssistantMessageId } from '@/lib/copilot/chat/effective-transcript' @@ -1536,6 +1543,9 @@ export function useChat( }, []) const resourcesRef = useRef(resources) resourcesRef.current = resources + const pendingPersistResourceKeysRef = useRef>(new Set()) + const inFlightResourceAddsRef = useRef>>(new Map()) + const reorderNeededAfterFlushRef = useRef(false) // Derive the effective active resource ID — auto-selects the last resource when the stored ID is // absent or no longer in the list, avoiding a separate Effect-based state correction loop. @@ -1962,6 +1972,9 @@ export function useChat( setTransportIdle() setResources([]) setActiveResourceId(null) + pendingPersistResourceKeysRef.current.clear() + inFlightResourceAddsRef.current.clear() + reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() setMessageQueue([]) clearQueueDispatchState() @@ -1974,6 +1987,44 @@ export function useChat( setTransportIdle, ]) + const flushPendingResources = useCallback(async (chatId: string) => { + const pendingKeys = pendingPersistResourceKeysRef.current + if (pendingKeys.size === 0) return + const flushPromises: Array> = [] + for (const resource of resourcesRef.current) { + if (resource.id === 'streaming-file') continue + const key = `${resource.type}:${resource.id}` + if (!pendingKeys.has(key)) continue + pendingKeys.delete(key) + const promise = requestJson(addMothershipChatResourceContract, { + body: { chatId, resource }, + }) + .catch((err) => { + pendingPersistResourceKeysRef.current.add(key) + logger.warn('Failed to flush pending resource; will retry on next hydration', err) + }) + .finally(() => { + inFlightResourceAddsRef.current.delete(key) + }) + inFlightResourceAddsRef.current.set(key, promise) + flushPromises.push(promise) + } + if (flushPromises.length === 0) return + await Promise.allSettled(flushPromises) + if (!reorderNeededAfterFlushRef.current) return + reorderNeededAfterFlushRef.current = false + const localOrder = resourcesRef.current.filter( + (r) => + r.id !== 'streaming-file' && !pendingPersistResourceKeysRef.current.has(`${r.type}:${r.id}`) + ) + if (localOrder.length === 0) return + requestJson(reorderMothershipChatResourcesContract, { + body: { chatId, resources: localOrder }, + }).catch((err) => { + logger.warn('Failed to sync resource order after flush', err) + }) + }, []) + const adoptResolvedChatId = useCallback( (chatId: string, options?: { replaceHomeHistory?: boolean; invalidateList?: boolean }) => { const selectedChatId = selectedChatIdRef.current @@ -1992,8 +2043,9 @@ export function useChat( if (options?.invalidateList) { queryClient.invalidateQueries({ queryKey: taskKeys.list(workspaceId) }) } + flushPendingResources(chatId) }, - [queryClient, workspaceId] + [flushPendingResources, queryClient, workspaceId] ) const { data: chatHistory } = useChatHistory(resolvedChatId) @@ -2018,15 +2070,21 @@ export function useChat( } const persistChatId = chatIdRef.current ?? selectedChatIdRef.current + const key = `${resource.type}:${resource.id}` if (persistChatId) { - // boundary-raw-fetch: fire-and-forget side-effect during stream lifecycle; intentionally avoids requestJson's response parsing/throw semantics so a failure here cannot interrupt the active turn - fetch('/api/mothership/chat/resources', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ chatId: persistChatId, resource }), - }).catch((err) => { - logger.warn('Failed to persist resource', err) + const promise = requestJson(addMothershipChatResourceContract, { + body: { chatId: persistChatId, resource }, }) + .catch((err) => { + pendingPersistResourceKeysRef.current.add(key) + logger.warn('Failed to persist resource; will retry on next hydration', err) + }) + .finally(() => { + inFlightResourceAddsRef.current.delete(key) + }) + inFlightResourceAddsRef.current.set(key, promise) + } else { + pendingPersistResourceKeysRef.current.add(key) } return true }, []) @@ -2035,21 +2093,67 @@ export function useChat( setResources((prev) => prev.filter((r) => !(r.type === resourceType && r.id === resourceId))) setActiveResourceId((prev) => (prev === resourceId ? null : prev)) + const key = `${resourceType}:${resourceId}` + const wasPending = pendingPersistResourceKeysRef.current.delete(key) + const inFlightAdd = inFlightResourceAddsRef.current.get(key) + if (wasPending && !inFlightAdd) return + const persistChatId = chatIdRef.current ?? selectedChatIdRef.current - if (persistChatId) { - // boundary-raw-fetch: fire-and-forget side-effect; intentionally avoids requestJson's response parsing/throw semantics so a transient failure cannot interrupt the caller - fetch('/api/mothership/chat/resources', { - method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ chatId: persistChatId, resourceType, resourceId }), + if (!persistChatId) return + const fireDelete = () => { + requestJson(removeMothershipChatResourceContract, { + body: { chatId: persistChatId, resourceType, resourceId }, }).catch((err) => { logger.warn('Failed to persist resource removal', err) }) } + if (inFlightAdd) { + inFlightAdd.finally(fireDelete) + } else { + fireDelete() + } }, []) const reorderResources = useCallback((newOrder: MothershipResource[]) => { setResources(newOrder) + const persistChatId = chatIdRef.current ?? selectedChatIdRef.current + if (!persistChatId) return + const pendingKeys = pendingPersistResourceKeysRef.current + const inFlightAdds = inFlightResourceAddsRef.current + const hasUnsyncedAdds = newOrder.some((r) => { + const key = `${r.type}:${r.id}` + return pendingKeys.has(key) || inFlightAdds.has(key) + }) + if (hasUnsyncedAdds) { + reorderNeededAfterFlushRef.current = true + if (pendingKeys.size === 0 && inFlightAdds.size > 0) { + Promise.allSettled(Array.from(inFlightAdds.values())).then(() => { + if (!reorderNeededAfterFlushRef.current) return + reorderNeededAfterFlushRef.current = false + const chatId = chatIdRef.current ?? selectedChatIdRef.current + if (!chatId) return + const order = resourcesRef.current.filter( + (r) => + r.id !== 'streaming-file' && + !pendingPersistResourceKeysRef.current.has(`${r.type}:${r.id}`) + ) + if (order.length === 0) return + requestJson(reorderMothershipChatResourcesContract, { + body: { chatId, resources: order }, + }).catch((err) => { + logger.warn('Failed to sync resource order after in-flight ADDs', err) + }) + }) + } + return + } + const persistableResources = newOrder.filter((r) => r.id !== 'streaming-file') + if (persistableResources.length === 0) return + requestJson(reorderMothershipChatResourcesContract, { + body: { chatId: persistChatId, resources: persistableResources }, + }).catch((err) => { + logger.warn('Failed to persist resource reorder', err) + }) }, []) const ensureWorkflowToolResource = useCallback( @@ -2179,6 +2283,9 @@ export function useChat( setTransportIdle() setResources([]) setActiveResourceId(null) + pendingPersistResourceKeysRef.current.clear() + inFlightResourceAddsRef.current.clear() + reorderNeededAfterFlushRef.current = false resetEphemeralPreviewState() setMessageQueue([]) clearQueueDispatchState() @@ -2229,27 +2336,32 @@ export function useChat( const hasPersistedStreamingFile = chatHistory.resources.some((r) => r.id === 'streaming-file') if (hasPersistedStreamingFile) { - // boundary-raw-fetch: fire-and-forget cleanup during chat-history hydration; failures are silently swallowed to keep hydration non-blocking - fetch('/api/mothership/chat/resources', { - method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ + requestJson(removeMothershipChatResourceContract, { + body: { chatId: chatHistory.id, resourceType: 'file', resourceId: 'streaming-file', - }), + }, }).catch(() => {}) } + flushPendingResources(chatHistory.id) + const persistedResources = chatHistory.resources.filter((r) => r.id !== 'streaming-file') - if (persistedResources.length > 0) { + const serverKeys = new Set(persistedResources.map((r) => `${r.type}:${r.id}`)) + const localOnly = resourcesRef.current.filter( + (r) => r.id !== 'streaming-file' && !serverKeys.has(`${r.type}:${r.id}`) + ) + const mergedResources = [...persistedResources, ...localOnly] + + if (mergedResources.length > 0) { const hydratedActiveResourceId = activeResourceIdRef.current && - persistedResources.some((resource) => resource.id === activeResourceIdRef.current) + mergedResources.some((resource) => resource.id === activeResourceIdRef.current) ? activeResourceIdRef.current - : persistedResources[persistedResources.length - 1].id + : mergedResources[mergedResources.length - 1].id activeResourceIdRef.current = hydratedActiveResourceId - setResources(persistedResources) + setResources(mergedResources) setActiveResourceId(hydratedActiveResourceId) for (const resource of persistedResources) { @@ -2373,6 +2485,7 @@ export function useChat( workspaceId, cancelActiveStreamReader, cancelActiveStreamRecovery, + flushPendingResources, queryClient, recoverPendingClientWorkflowTools, seedPreviewSessions, @@ -5003,9 +5116,8 @@ export function useChat( const executionId = execState.getCurrentExecutionId(workflowId) if (executionId) { execState.setCurrentExecutionId(workflowId, null) - // boundary-raw-fetch: fire-and-forget execution cancellation invoked from a stop-generation barrier; failures are silently swallowed so the stop teardown cannot stall on a contract-validation throw - fetch(`/api/workflows/${workflowId}/executions/${executionId}/cancel`, { - method: 'POST', + requestJson(cancelWorkflowExecutionContract, { + params: { id: workflowId, executionId }, }).catch(() => {}) }