Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/dnf-active-conditions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/electric-db-collection': minor
---

feat: add DNF/active_conditions support for arbitrary boolean WHERE clauses

Support the new Electric server wire protocol (electric-sql/electric#3791). Tags now use `/` delimiter with empty segments for non-participating positions. Shapes with subquery dependencies send `active_conditions` headers and use DNF evaluation for row visibility. Simple shapes without subqueries retain existing empty-tag-set deletion behavior.
2 changes: 1 addition & 1 deletion packages/electric-db-collection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"src"
],
"dependencies": {
"@electric-sql/client": "^1.5.13",
"@electric-sql/client": "https://pkg.pr.new/@electric-sql/client@4043",
"@standard-schema/spec": "^1.1.0",
"@tanstack/db": "workspace:*",
"@tanstack/store": "^0.9.2",
Expand Down
118 changes: 107 additions & 11 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@ import {
import { compileSQL } from './sql-compiler'
import {
addTagToIndex,
deriveDisjunctPositions,
findRowsMatchingPattern,
getTagLength,
isMoveInMessage,
isMoveOutMessage,
parseTag as parseTagString,
removeTagFromIndex,
rowVisible,
tagMatchesPattern,
} from './tag-index'
import type { ColumnEncoder } from './sql-compiler'
import type {
MoveOutPattern,
ActiveConditions,
DisjunctPositions,
MovePattern,
MoveTag,
ParsedMoveTag,
RowId,
Expand Down Expand Up @@ -981,16 +987,16 @@ function createElectricSync<T extends Row<unknown>>(

const tagCache = new Map<MoveTag, ParsedMoveTag>()

// Parses a tag string into a MoveTag.
// Parses a tag string into a ParsedMoveTag.
// It memoizes the result parsed tag such that future calls
// for the same tag string return the same MoveTag array.
// for the same tag string return the same ParsedMoveTag array.
const parseTag = (tag: MoveTag): ParsedMoveTag => {
const cachedTag = tagCache.get(tag)
if (cachedTag) {
return cachedTag
}

const parsedTag = tag.split(`|`)
const parsedTag = parseTagString(tag)
tagCache.set(tag, parsedTag)
return parsedTag
}
Expand All @@ -1000,6 +1006,11 @@ function createElectricSync<T extends Row<unknown>>(
const tagIndex: TagIndex = []
let tagLength: number | undefined = undefined

// DNF state: active_conditions are per-row, disjunct_positions are global
// (fixed by the shape's WHERE clause, derived once from the first tagged message).
const rowActiveConditions = new Map<RowId, ActiveConditions>()
let disjunctPositions: DisjunctPositions | undefined = undefined

/**
* Initialize the tag index with the correct length
*/
Expand Down Expand Up @@ -1074,6 +1085,7 @@ function createElectricSync<T extends Row<unknown>>(
tags: Array<MoveTag> | undefined,
removedTags: Array<MoveTag> | undefined,
rowId: RowId,
activeConditions?: ActiveConditions,
): Set<MoveTag> => {
// Initialize tag set for this row if it doesn't exist (needed for checking deletion)
if (!rowTagSets.has(rowId)) {
Expand All @@ -1084,13 +1096,24 @@ function createElectricSync<T extends Row<unknown>>(
// Add new tags
if (tags) {
addTagsToRow(tags, rowId, rowTagSet)

// Derive disjunct positions once — they are fixed by the shape's WHERE clause.
if (disjunctPositions === undefined) {
const parsedTags = tags.map(parseTag)
disjunctPositions = deriveDisjunctPositions(parsedTags)
}
}

// Remove tags
if (removedTags) {
removeTagsFromRow(removedTags, rowId, rowTagSet)
}

// Store active conditions if provided (overwrite on re-send)
if (activeConditions && activeConditions.length > 0) {
rowActiveConditions.set(rowId, [...activeConditions])
}

return rowTagSet
}

Expand All @@ -1101,6 +1124,8 @@ function createElectricSync<T extends Row<unknown>>(
rowTagSets.clear()
tagIndex.length = 0
tagLength = undefined
rowActiveConditions.clear()
disjunctPositions = undefined
}

/**
Expand Down Expand Up @@ -1129,22 +1154,45 @@ function createElectricSync<T extends Row<unknown>>(

// Remove the row from the tag sets map
rowTagSets.delete(rowId)
rowActiveConditions.delete(rowId)
}

/**
* Remove matching tags from a row based on a pattern
* Returns true if the row's tag set is now empty
* Returns true if the row should be deleted (no longer visible)
*/
const removeMatchingTagsFromRow = (
rowId: RowId,
pattern: MoveOutPattern,
pattern: MovePattern,
): boolean => {
const rowTagSet = rowTagSets.get(rowId)
if (!rowTagSet) {
return false
}

// Find tags that match this pattern and remove them
// DNF mode: check visibility using active conditions.
// Tag index entries are preserved so that move-in can re-activate positions.
const activeConditions = rowActiveConditions.get(rowId)
if (activeConditions && disjunctPositions) {
// Set the condition at this pattern's position to false
activeConditions[pattern.pos] = false

if (!rowVisible(activeConditions, disjunctPositions)) {
// Row is no longer visible — clean up all state including tag index
for (const tag of rowTagSet) {
const parsedTag = parseTag(tag)
removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength!)
tagCache.delete(tag)
}
rowTagSets.delete(rowId)
rowActiveConditions.delete(rowId)
return true
}
return false
}

// Simple shape (no subquery dependencies — server sends no active_conditions):
// Remove matching tags and delete if tag set is empty
for (const tag of rowTagSet) {
const parsedTag = parseTag(tag)
if (tagMatchesPattern(parsedTag, pattern)) {
Expand All @@ -1153,7 +1201,6 @@ function createElectricSync<T extends Row<unknown>>(
}
}

// Check if row's tag set is now empty
if (rowTagSet.size === 0) {
rowTagSets.delete(rowId)
return true
Expand All @@ -1166,7 +1213,7 @@ function createElectricSync<T extends Row<unknown>>(
* Process move-out event: remove matching tags from rows and delete rows with empty tag sets
*/
const processMoveOutEvent = (
patterns: Array<MoveOutPattern>,
patterns: Array<MovePattern>,
begin: () => void,
write: (message: ChangeMessageOrDeleteKeyMessage<T>) => void,
transactionStarted: boolean,
Expand Down Expand Up @@ -1204,6 +1251,30 @@ function createElectricSync<T extends Row<unknown>>(
return txStarted
}

/**
* Process move-in event: re-activate conditions for rows matching the patterns.
* This is a silent operation — no messages are emitted to the collection.
*/
const processMoveInEvent = (patterns: Array<MovePattern>): void => {
if (tagLength === undefined) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Received move-in message but no tag length set yet, ignoring`,
)
return
}

for (const pattern of patterns) {
const affectedRowIds = findRowsMatchingPattern(pattern, tagIndex)

for (const rowId of affectedRowIds) {
const activeConditions = rowActiveConditions.get(rowId)
if (activeConditions) {
activeConditions[pattern.pos] = true
}
}
}
}

/**
* Get the sync metadata for insert operations
* @returns Record containing relation information
Expand Down Expand Up @@ -1433,6 +1504,11 @@ function createElectricSync<T extends Row<unknown>>(
const removedTags = changeMessage.headers.removed_tags
const hasTags = tags || removedTags

// Extract active_conditions from headers (DNF support)
const activeConditions = changeMessage.headers.active_conditions as
| ActiveConditions
| undefined

const rowId = collection.getKeyFromItem(changeMessage.value)
const operation = changeMessage.headers.operation

Expand All @@ -1453,7 +1529,12 @@ function createElectricSync<T extends Row<unknown>>(
if (isDelete) {
clearTagsForRow(rowId)
} else if (hasTags) {
processTagsForChangeMessage(tags, removedTags, rowId)
processTagsForChangeMessage(
tags,
removedTags,
rowId,
activeConditions,
)
}

write({
Expand Down Expand Up @@ -1496,7 +1577,11 @@ function createElectricSync<T extends Row<unknown>>(

for (const message of messages) {
// Add message to current batch buffer (for race condition handling)
if (isChangeMessage(message) || isMoveOutMessage(message)) {
if (
isChangeMessage(message) ||
isMoveOutMessage(message) ||
isMoveInMessage(message)
) {
currentBatchMessages.setState((currentBuffer) => {
const newBuffer = [...currentBuffer, message]
// Limit buffer size for safety
Expand Down Expand Up @@ -1593,6 +1678,14 @@ function createElectricSync<T extends Row<unknown>>(
transactionStarted,
)
}
} else if (isMoveInMessage(message)) {
// Handle move-in event: re-activate conditions for matching rows.
// Buffer if buffering, otherwise process immediately.
if (isBufferingInitialSync() && !transactionStarted) {
bufferedMessages.push(message)
} else {
processMoveInEvent(message.headers.patterns)
}
} else if (isMustRefetchMessage(message)) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`,
Expand Down Expand Up @@ -1672,6 +1765,9 @@ function createElectricSync<T extends Row<unknown>>(
write,
transactionStarted,
)
} else if (isMoveInMessage(bufferedMsg)) {
// Process buffered move-in messages during atomic swap
processMoveInEvent(bufferedMsg.headers.patterns)
}
}

Expand Down
Loading
Loading