From 379fbea855891905ce6d9b29f8fe34ea8bb05848 Mon Sep 17 00:00:00 2001 From: msvargas Date: Mon, 15 Jun 2026 16:25:41 -0500 Subject: [PATCH] fix(tanstack-react-query): use skipToken instead of overriding enabled option Replace the enabled: streamsHaveSynced pattern with TanStack's skipToken to preserve the user's own enabled option. Problems fixed: - User's enabled option was silently overridden by PowerSync's streamsHaveSynced - usePowerSyncQueries final useMemo was missing streamsHaveSynced in deps, returning stale values - Race condition: change listeners attached watching [] tables while resolveTables was pending, losing first-sync writes Changes: - useQuery: Conditionally pass skipToken when streams haven't synced; suspense queries always get the real queryFn - useQueries: Same skipToken approach per query entry; add streamsHaveSynced to deps - usePowerSyncQueries: Add tablesInitialized ref to rescue data lost during the []->[tables] transition; add streamsHaveSynced to return memo deps --- .../src/hooks/usePowerSyncQueries.ts | 20 ++- .../src/hooks/useQueries.ts | 8 +- .../src/hooks/useQuery.ts | 9 +- .../tests/enabled.test.tsx | 100 ++++++++++++ .../tests/usePowerSyncQueries.test.tsx | 143 ++++++++++++++++++ 5 files changed, 271 insertions(+), 9 deletions(-) create mode 100644 packages/tanstack-react-query/tests/enabled.test.tsx create mode 100644 packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx diff --git a/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts b/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts index 9a4c7f877..c910b2358 100644 --- a/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts +++ b/packages/tanstack-react-query/src/hooks/usePowerSyncQueries.ts @@ -1,6 +1,6 @@ import { type CompilableQuery, parseQuery } from '@powersync/common'; import { QuerySyncStreamOptions, useAllSyncStreamsHaveSynced, usePowerSync } from '@powersync/react'; -import { useEffect, useState, useCallback, useMemo } from 'react'; +import { useEffect, useState, useCallback, useMemo, useRef } from 'react'; import * as Tanstack from '@tanstack/react-query'; export type UsePowerSyncQueriesInput = { @@ -111,7 +111,17 @@ export function usePowerSyncQueries( })) ); + // Tracks the []->[tables] transition to rescue data lost during pending resolveTables. + const tablesInitialized = useRef([]); + + if (tablesInitialized.current.length !== parsedQueries.length) { + tablesInitialized.current = parsedQueries.map(() => false); + } + useEffect(() => { + // Re-arm rescue tracking when queries change. + tablesInitialized.current = parsedQueries.map(() => false); + const listeners = parsedQueries.map((pq, idx) => { if (pq.parseError || !pq.query) { return null; @@ -154,6 +164,12 @@ export function usePowerSyncQueries( const abort = new AbortController(); + // Rescue data lost while resolveTables was pending (listener watched [] tables). + if (tablesArr[idx]?.length > 0 && !tablesInitialized.current[idx]) { + tablesInitialized.current[idx] = true; + queryClient.invalidateQueries({ queryKey: pq.queryKey }); + } + powerSync.onChangeWithCallback( { onChange: () => { @@ -203,5 +219,5 @@ export function usePowerSyncQueries( }), streamsHaveSynced }; - }, [parsedQueries, errorsArr, tablesArr, powerSync]); + }, [parsedQueries, errorsArr, tablesArr, powerSync, streamsHaveSynced]); } diff --git a/packages/tanstack-react-query/src/hooks/useQueries.ts b/packages/tanstack-react-query/src/hooks/useQueries.ts index 4798e568e..52e210877 100644 --- a/packages/tanstack-react-query/src/hooks/useQueries.ts +++ b/packages/tanstack-react-query/src/hooks/useQueries.ts @@ -151,15 +151,15 @@ export function useQueries( return queriesInput.map((queryOptions, idx) => { const { query, parameters, ...rest } = queryOptions; const state = states[idx]; + const queryFn = streamsHaveSynced ? (query ? state.queryFn : rest.queryFn) : Tanstack.skipToken; return { ...rest, - queryFn: query ? state.queryFn : rest.queryFn, - queryKey: rest.queryKey, - enabled: streamsHaveSynced + queryFn, + queryKey: rest.queryKey }; }); - }, [queriesInput, states]); + }, [queriesInput, states, streamsHaveSynced]); return Tanstack.useQueries( { diff --git a/packages/tanstack-react-query/src/hooks/useQuery.ts b/packages/tanstack-react-query/src/hooks/useQuery.ts index ace5ff63e..e1b9a38ec 100644 --- a/packages/tanstack-react-query/src/hooks/useQuery.ts +++ b/packages/tanstack-react-query/src/hooks/useQuery.ts @@ -135,7 +135,7 @@ function useQueryCore< const { query, parameters, queryKey, streams, ...resolvedOptions } = options; const { - queries: [{ queryFn }], + queries: [{ queryFn: powerSyncQueryFn }], streamsHaveSynced } = usePowerSyncQueries( [ @@ -149,12 +149,15 @@ function useQueryCore< queryClient ); + const isSuspense = useQueryFn === Tanstack.useSuspenseQuery; + const resolvedQueryFn = query ? powerSyncQueryFn : resolvedOptions.queryFn; + const queryFn = streamsHaveSynced || isSuspense ? resolvedQueryFn : Tanstack.skipToken; + return useQueryFn( { ...(resolvedOptions as TQueryOptions), queryKey, - queryFn: query ? queryFn : resolvedOptions.queryFn, - enabled: streamsHaveSynced + ...((typeof queryFn === 'function' || queryFn === Tanstack.skipToken) && { queryFn }) } as TQueryOptions, queryClient ); diff --git a/packages/tanstack-react-query/tests/enabled.test.tsx b/packages/tanstack-react-query/tests/enabled.test.tsx new file mode 100644 index 000000000..644a25e91 --- /dev/null +++ b/packages/tanstack-react-query/tests/enabled.test.tsx @@ -0,0 +1,100 @@ +import { cleanup, renderHook, waitFor } from '@testing-library/react'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import React, { Suspense } from 'react'; +import { AbstractPowerSyncDatabase } from '@powersync/common'; +import { PowerSyncContext } from '@powersync/react'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { openPowerSync } from './utils'; +import { useQuery } from '../src/hooks/useQuery'; +import { useQueries } from '../src/hooks/useQueries'; +import { useSuspenseQuery } from '../src/hooks/useQuery'; + +describe('user enabled option is respected', () => { + let db: AbstractPowerSyncDatabase; + let queryClient = new QueryClient({ + defaultOptions: { + queries: { + retry: false + } + } + }); + + beforeEach(() => { + db = openPowerSync(); + queryClient.clear(); + vi.clearAllMocks(); + cleanup(); + }); + + const wrapper = ({ children }: { children: React.ReactNode }) => ( + + {children} + + ); + + it('useQuery: does not run when user passes enabled: false (no streams)', async () => { + const getAllSpy = vi.spyOn(db, 'getAll'); + + const { result } = renderHook( + () => + useQuery({ + queryKey: ['lists'], + query: 'SELECT * FROM lists', + enabled: false + }), + { wrapper } + ); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(result.current.status).toBe('pending'); + expect(result.current.fetchStatus).toBe('idle'); + expect(result.current.data).toBeUndefined(); + expect(getAllSpy).not.toHaveBeenCalledWith('SELECT * FROM lists', expect.anything()); + }); + + it('useQueries: does not run an entry where user passes enabled: false (no streams)', async () => { + const getAllSpy = vi.spyOn(db, 'getAll'); + + const { result } = renderHook( + () => + useQueries({ + queries: [{ queryKey: ['lists'], query: 'SELECT * FROM lists', enabled: false }] + }), + { wrapper } + ); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + expect(result.current[0].status).toBe('pending'); + expect(result.current[0].fetchStatus).toBe('idle'); + expect(result.current[0].data).toBeUndefined(); + expect(getAllSpy).not.toHaveBeenCalledWith('SELECT * FROM lists', expect.anything()); + }); + + it('useSuspenseQuery: with an unsynced waitForStream stream does not error with skipToken and still resolves', async () => { + const consoleErrorSpy = vi.spyOn(console, 'error'); + + const suspenseWrapper = ({ children }: { children: React.ReactNode }) => ( + + + loading}>{children} + + + ); + + const { result } = renderHook( + () => + useSuspenseQuery({ + queryKey: ['suspense-lists'], + query: 'SELECT * FROM lists', + streams: [{ name: 'a', waitForStream: true }] + }), + { wrapper: suspenseWrapper } + ); + + await waitFor(() => expect(result.current.data).toBeDefined(), { timeout: 1000, interval: 100 }); + + expect(consoleErrorSpy).not.toHaveBeenCalledWith('skipToken is not allowed for useSuspenseQuery'); + }); +}); diff --git a/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx b/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx new file mode 100644 index 000000000..1d28ac0e5 --- /dev/null +++ b/packages/tanstack-react-query/tests/usePowerSyncQueries.test.tsx @@ -0,0 +1,143 @@ +import { cleanup, renderHook, waitFor } from '@testing-library/react'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import React from 'react'; +import { AbstractPowerSyncDatabase, SyncStatus } from '@powersync/common'; +import { PowerSyncContext } from '@powersync/react'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { openPowerSync } from './utils'; +import * as Tanstack from '@tanstack/react-query'; +import { usePowerSyncQueries } from '../src/hooks/usePowerSyncQueries'; + +describe('usePowerSyncQueries bug fixes', () => { + let db: AbstractPowerSyncDatabase; + let queryClient = new QueryClient({ + defaultOptions: { + queries: { + retry: false + } + } + }); + + beforeEach(() => { + db = openPowerSync(); + queryClient.clear(); + vi.clearAllMocks(); + cleanup(); + }); + + const wrapper = ({ children }: { children: React.ReactNode }) => ( + + {children} + + ); + + const syncedStatus = () => + new SyncStatus({ + dataFlow: { + internalStreamSubscriptions: [ + { + name: 'a', + parameters: null, + progress: { total: 0, downloaded: 0 }, + active: true, + is_default: false, + has_explicit_subscription: true, + expires_at: null, + last_synced_at: 1234, + priority: 1 + } + ] + } + }); + + describe('Bug 1: streamsHaveSynced in final useMemo deps', () => { + it('updated returned streamsHaveSynced once a waitForStream stream syncs', async () => { + const stableQueries = [ + { + query: 'SELECT name FROM lists', + queryKey: ['bug1'], + streams: [{ name: 'a', waitForStream: true }] + } + ]; + + const { result, unmount } = renderHook(() => usePowerSyncQueries(stableQueries, queryClient), { wrapper }); + + await waitFor(() => expect(result.current.streamsHaveSynced).toBe(false), { timeout: 1000, interval: 50 }); + + db.currentStatus = syncedStatus(); + db.iterateListeners((l) => l.statusChanged?.(syncedStatus())); + + await waitFor(() => expect(result.current.streamsHaveSynced).toBe(true), { timeout: 1000, interval: 50 }); + + await waitFor(() => expect(result.current.queries[0].tables).toContain('lists'), { + timeout: 1000, + interval: 50 + }); + + unmount(); + }); + }); + + describe('Bug 2: first table-resolution race loses first-sync data', () => { + it('reflects rows written while table resolution is still on the slow path', async () => { + let releaseResolve!: () => void; + const resolveGate = new Promise((resolve) => { + releaseResolve = resolve; + }); + + const realResolveTables = db.resolveTables.bind(db); + const resolveSpy = vi.spyOn(db, 'resolveTables').mockImplementation(async (sql, params) => { + await resolveGate; + return realResolveTables(sql, params); + }); + + const realRegisterListener = db.registerListener.bind(db); + const registerListenerSpy = vi + .spyOn(db, 'registerListener') + .mockImplementation((listener: Parameters[0]) => { + if (listener && (listener as { schemaChanged?: unknown }).schemaChanged) { + const { schemaChanged: _omit, ...rest } = listener as Record; + return realRegisterListener(rest as Parameters[0]); + } + return realRegisterListener(listener); + }); + + const stableQueries = [{ query: 'SELECT name FROM lists ORDER BY name', queryKey: ['bug2'] }]; + + const { result, unmount } = renderHook( + () => { + const { queries } = usePowerSyncQueries(stableQueries, queryClient); + return Tanstack.useQuery( + { + queryKey: ['bug2'], + queryFn: queries[0].queryFn as () => Promise<{ name: string }[]>, + staleTime: Infinity, + refetchOnMount: false, + refetchOnWindowFocus: false, + refetchOnReconnect: false + }, + queryClient + ); + }, + { wrapper } + ); + + await waitFor(() => expect(result.current.data).toEqual([]), { timeout: 2000, interval: 50 }); + + await db.execute('INSERT INTO lists (id, name) VALUES (uuid(), ?)', ['from-first-sync']); + + await new Promise((r) => setTimeout(r, 150)); + + releaseResolve(); + + await waitFor(() => expect(result.current.data).toEqual([{ name: 'from-first-sync' }]), { + timeout: 2000, + interval: 50 + }); + + resolveSpy.mockRestore(); + registerListenerSpy.mockRestore(); + unmount(); + }); + }); +});