Skip to content

Commit b1a87d5

Browse files
Revert "improvement(db): add session statement/lock timeouts; simplify KB doc tx (#4593)" (#4599)
This reverts commit 4295a5c.
1 parent 8831def commit b1a87d5

4 files changed

Lines changed: 100 additions & 229 deletions

File tree

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 100 additions & 213 deletions
Original file line numberDiff line numberDiff line change
@@ -748,126 +748,6 @@ async function processDocumentsWithTrigger(
748748
}
749749
}
750750

751-
interface NewDocumentRow {
752-
id: string
753-
knowledgeBaseId: string
754-
filename: string
755-
fileUrl: string
756-
fileSize: number
757-
mimeType: string
758-
chunkCount: number
759-
tokenCount: number
760-
characterCount: number
761-
processingStatus: 'pending'
762-
enabled: boolean
763-
uploadedAt: Date
764-
tag1: string | null
765-
tag2: string | null
766-
tag3: string | null
767-
tag4: string | null
768-
tag5: string | null
769-
tag6: string | null
770-
tag7: string | null
771-
number1: number | null
772-
number2: number | null
773-
number3: number | null
774-
number4: number | null
775-
number5: number | null
776-
date1: Date | null
777-
date2: Date | null
778-
boolean1: boolean | null
779-
boolean2: boolean | null
780-
boolean3: boolean | null
781-
}
782-
783-
/**
784-
* Insert N document rows IF the parent knowledge base is still alive
785-
* (`deleted_at IS NULL`) at the statement's MVCC snapshot. Returns the
786-
* number of rows actually inserted.
787-
*
788-
* Knowledge bases are soft-deleted, so a normal FK can't catch a concurrent
789-
* delete — the KB row physically remains. We do the existence check and the
790-
* insert in a single statement via INSERT...SELECT...WHERE EXISTS, which
791-
* Postgres evaluates atomically. No transaction or row lock required, no
792-
* race window between check and insert.
793-
*
794-
* Returns 0 if the KB was soft-deleted; caller throws.
795-
*/
796-
async function insertDocumentsIfKbAlive(
797-
rows: NewDocumentRow[],
798-
knowledgeBaseId: string
799-
): Promise<number> {
800-
if (rows.length === 0) return 0
801-
802-
// jsonb_to_recordset declares the column types once, so we don't need to
803-
// cast every parameter individually to keep Postgres' type inference happy
804-
// when nullable columns end up all-NULL across the batch.
805-
const jsonRows = rows.map((d) => ({
806-
id: d.id,
807-
knowledge_base_id: d.knowledgeBaseId,
808-
filename: d.filename,
809-
file_url: d.fileUrl,
810-
file_size: d.fileSize,
811-
mime_type: d.mimeType,
812-
chunk_count: d.chunkCount,
813-
token_count: d.tokenCount,
814-
character_count: d.characterCount,
815-
processing_status: d.processingStatus,
816-
enabled: d.enabled,
817-
uploaded_at: d.uploadedAt.toISOString(),
818-
tag1: d.tag1,
819-
tag2: d.tag2,
820-
tag3: d.tag3,
821-
tag4: d.tag4,
822-
tag5: d.tag5,
823-
tag6: d.tag6,
824-
tag7: d.tag7,
825-
number1: d.number1,
826-
number2: d.number2,
827-
number3: d.number3,
828-
number4: d.number4,
829-
number5: d.number5,
830-
date1: d.date1?.toISOString() ?? null,
831-
date2: d.date2?.toISOString() ?? null,
832-
boolean1: d.boolean1,
833-
boolean2: d.boolean2,
834-
boolean3: d.boolean3,
835-
}))
836-
837-
const result = await db.execute(sql`
838-
INSERT INTO document (
839-
id, knowledge_base_id, filename, file_url, file_size, mime_type,
840-
chunk_count, token_count, character_count, processing_status, enabled, uploaded_at,
841-
tag1, tag2, tag3, tag4, tag5, tag6, tag7,
842-
number1, number2, number3, number4, number5,
843-
date1, date2,
844-
boolean1, boolean2, boolean3
845-
)
846-
SELECT
847-
id, knowledge_base_id, filename, file_url, file_size, mime_type,
848-
chunk_count, token_count, character_count, processing_status, enabled, uploaded_at,
849-
tag1, tag2, tag3, tag4, tag5, tag6, tag7,
850-
number1, number2, number3, number4, number5,
851-
date1, date2,
852-
boolean1, boolean2, boolean3
853-
FROM jsonb_to_recordset(${JSON.stringify(jsonRows)}::jsonb) AS x(
854-
id text, knowledge_base_id text, filename text, file_url text, file_size integer, mime_type text,
855-
chunk_count integer, token_count integer, character_count integer, processing_status text, enabled boolean, uploaded_at timestamp,
856-
tag1 text, tag2 text, tag3 text, tag4 text, tag5 text, tag6 text, tag7 text,
857-
number1 double precision, number2 double precision, number3 double precision, number4 double precision, number5 double precision,
858-
date1 timestamp, date2 timestamp,
859-
boolean1 boolean, boolean2 boolean, boolean3 boolean
860-
)
861-
WHERE EXISTS (
862-
SELECT 1 FROM knowledge_base
863-
WHERE id = ${knowledgeBaseId} AND deleted_at IS NULL
864-
)
865-
RETURNING id
866-
`)
867-
868-
return Array.from(result).length
869-
}
870-
871751
export async function createDocumentRecords(
872752
documents: Array<{
873753
filename: string
@@ -886,102 +766,99 @@ export async function createDocumentRecords(
886766
knowledgeBaseId: string,
887767
requestId: string
888768
): Promise<DocumentData[]> {
889-
// Cheap upfront existence check so the common KB-not-found path fails fast
890-
// before we burn CPU on tag processing. The atomic insert below is the
891-
// race-safe guard against a concurrent KB soft-delete in the small window
892-
// between this check and the insert.
893-
const kb = await db
894-
.select({ id: knowledgeBase.id })
895-
.from(knowledgeBase)
896-
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
897-
.limit(1)
769+
return await db.transaction(async (tx) => {
770+
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)
898771

899-
if (kb.length === 0) {
900-
throw new Error('Knowledge base not found')
901-
}
772+
const kb = await tx
773+
.select({ id: knowledgeBase.id })
774+
.from(knowledgeBase)
775+
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
776+
.limit(1)
902777

903-
const now = new Date()
904-
const documentRecords: NewDocumentRow[] = []
905-
const returnData: DocumentData[] = []
778+
if (kb.length === 0) {
779+
throw new Error('Knowledge base not found')
780+
}
906781

907-
for (const docData of documents) {
908-
const documentId = generateId()
782+
const now = new Date()
783+
const documentRecords = []
784+
const returnData: DocumentData[] = []
909785

910-
let processedTags: Partial<ProcessedDocumentTags> = {}
786+
for (const docData of documents) {
787+
const documentId = generateId()
911788

912-
if (docData.documentTagsData) {
913-
try {
914-
const tagData = JSON.parse(docData.documentTagsData)
915-
if (Array.isArray(tagData)) {
916-
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
917-
}
918-
} catch (error) {
919-
if (error instanceof SyntaxError) {
920-
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
921-
} else {
922-
throw error
789+
let processedTags: Partial<ProcessedDocumentTags> = {}
790+
791+
if (docData.documentTagsData) {
792+
try {
793+
const tagData = JSON.parse(docData.documentTagsData)
794+
if (Array.isArray(tagData)) {
795+
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
796+
}
797+
} catch (error) {
798+
if (error instanceof SyntaxError) {
799+
logger.warn(`[${requestId}] Failed to parse documentTagsData for bulk document:`, error)
800+
} else {
801+
throw error
802+
}
923803
}
924804
}
925-
}
926805

927-
const newDocument = {
928-
id: documentId,
929-
knowledgeBaseId,
930-
filename: docData.filename,
931-
fileUrl: docData.fileUrl,
932-
fileSize: docData.fileSize,
933-
mimeType: docData.mimeType,
934-
chunkCount: 0,
935-
tokenCount: 0,
936-
characterCount: 0,
937-
processingStatus: 'pending' as const,
938-
enabled: true,
939-
uploadedAt: now,
940-
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
941-
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
942-
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
943-
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
944-
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
945-
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
946-
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
947-
number1: processedTags.number1 ?? null,
948-
number2: processedTags.number2 ?? null,
949-
number3: processedTags.number3 ?? null,
950-
number4: processedTags.number4 ?? null,
951-
number5: processedTags.number5 ?? null,
952-
date1: processedTags.date1 ?? null,
953-
date2: processedTags.date2 ?? null,
954-
boolean1: processedTags.boolean1 ?? null,
955-
boolean2: processedTags.boolean2 ?? null,
956-
boolean3: processedTags.boolean3 ?? null,
806+
const newDocument = {
807+
id: documentId,
808+
knowledgeBaseId,
809+
filename: docData.filename,
810+
fileUrl: docData.fileUrl,
811+
fileSize: docData.fileSize,
812+
mimeType: docData.mimeType,
813+
chunkCount: 0,
814+
tokenCount: 0,
815+
characterCount: 0,
816+
processingStatus: 'pending' as const,
817+
enabled: true,
818+
uploadedAt: now,
819+
tag1: processedTags.tag1 ?? docData.tag1 ?? null,
820+
tag2: processedTags.tag2 ?? docData.tag2 ?? null,
821+
tag3: processedTags.tag3 ?? docData.tag3 ?? null,
822+
tag4: processedTags.tag4 ?? docData.tag4 ?? null,
823+
tag5: processedTags.tag5 ?? docData.tag5 ?? null,
824+
tag6: processedTags.tag6 ?? docData.tag6 ?? null,
825+
tag7: processedTags.tag7 ?? docData.tag7 ?? null,
826+
number1: processedTags.number1 ?? null,
827+
number2: processedTags.number2 ?? null,
828+
number3: processedTags.number3 ?? null,
829+
number4: processedTags.number4 ?? null,
830+
number5: processedTags.number5 ?? null,
831+
date1: processedTags.date1 ?? null,
832+
date2: processedTags.date2 ?? null,
833+
boolean1: processedTags.boolean1 ?? null,
834+
boolean2: processedTags.boolean2 ?? null,
835+
boolean3: processedTags.boolean3 ?? null,
836+
}
837+
838+
documentRecords.push(newDocument)
839+
returnData.push({
840+
documentId,
841+
filename: docData.filename,
842+
fileUrl: docData.fileUrl,
843+
fileSize: docData.fileSize,
844+
mimeType: docData.mimeType,
845+
})
957846
}
958847

959-
documentRecords.push(newDocument)
960-
returnData.push({
961-
documentId,
962-
filename: docData.filename,
963-
fileUrl: docData.fileUrl,
964-
fileSize: docData.fileSize,
965-
mimeType: docData.mimeType,
966-
})
967-
}
848+
if (documentRecords.length > 0) {
849+
await tx.insert(document).values(documentRecords)
850+
logger.info(
851+
`[${requestId}] Bulk created ${documentRecords.length} document records in knowledge base ${knowledgeBaseId}`
852+
)
968853

969-
if (documentRecords.length > 0) {
970-
const insertedCount = await insertDocumentsIfKbAlive(documentRecords, knowledgeBaseId)
971-
if (insertedCount === 0) {
972-
throw new Error('Knowledge base not found')
854+
await tx
855+
.update(knowledgeBase)
856+
.set({ updatedAt: now })
857+
.where(eq(knowledgeBase.id, knowledgeBaseId))
973858
}
974-
logger.info(
975-
`[${requestId}] Bulk created ${insertedCount} document records in knowledge base ${knowledgeBaseId}`
976-
)
977-
978-
await db
979-
.update(knowledgeBase)
980-
.set({ updatedAt: now })
981-
.where(eq(knowledgeBase.id, knowledgeBaseId))
982-
}
983859

984-
return returnData
860+
return returnData
861+
})
985862
}
986863

987864
export interface TagFilterCondition {
@@ -1420,7 +1297,7 @@ export async function createSingleDocument(
14201297
}
14211298
}
14221299

1423-
const newDocument: NewDocumentRow = {
1300+
const newDocument = {
14241301
id: documentId,
14251302
knowledgeBaseId,
14261303
filename: documentData.filename,
@@ -1430,21 +1307,31 @@ export async function createSingleDocument(
14301307
chunkCount: 0,
14311308
tokenCount: 0,
14321309
characterCount: 0,
1433-
processingStatus: 'pending',
14341310
enabled: true,
14351311
uploadedAt: now,
14361312
...processedTags,
14371313
}
14381314

1439-
const insertedCount = await insertDocumentsIfKbAlive([newDocument], knowledgeBaseId)
1440-
if (insertedCount === 0) {
1441-
throw new Error('Knowledge base not found')
1442-
}
1315+
await db.transaction(async (tx) => {
1316+
await tx.execute(sql`SELECT 1 FROM knowledge_base WHERE id = ${knowledgeBaseId} FOR UPDATE`)
14431317

1444-
await db
1445-
.update(knowledgeBase)
1446-
.set({ updatedAt: now })
1447-
.where(eq(knowledgeBase.id, knowledgeBaseId))
1318+
const kb = await tx
1319+
.select({ id: knowledgeBase.id })
1320+
.from(knowledgeBase)
1321+
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
1322+
.limit(1)
1323+
1324+
if (kb.length === 0) {
1325+
throw new Error('Knowledge base not found')
1326+
}
1327+
1328+
await tx.insert(document).values(newDocument)
1329+
1330+
await tx
1331+
.update(knowledgeBase)
1332+
.set({ updatedAt: now })
1333+
.where(eq(knowledgeBase.id, knowledgeBaseId))
1334+
})
14481335
logger.info(`[${requestId}] Document created: ${documentId} in knowledge base ${knowledgeBaseId}`)
14491336

14501337
return newDocument as {

apps/sim/lib/workspaces/lifecycle.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ describe('workspace lifecycle', () => {
5555
})
5656

5757
const tx = {
58-
execute: vi.fn().mockResolvedValue([]),
5958
select: vi.fn().mockReturnValue({
6059
from: vi.fn().mockReturnValue({
6160
where: vi.fn().mockResolvedValue([{ id: 'kb-1' }]),

apps/sim/lib/workspaces/lifecycle.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,6 @@ export async function archiveWorkspace(
4949
.where(eq(workflowMcpServer.workspaceId, workspaceId))
5050

5151
await db.transaction(async (tx) => {
52-
// Workspace archival is a rare admin/cleanup operation that touches every
53-
// child table; on large workspaces it can exceed the 30s session default.
54-
// Override per-tx with a generous ceiling — if it ever runs longer than
55-
// this something is genuinely wrong.
56-
await tx.execute(sql`SET LOCAL statement_timeout = '5min'`)
57-
await tx.execute(sql`SET LOCAL lock_timeout = '30s'`)
58-
5952
await tx
6053
.update(knowledgeBase)
6154
.set({

packages/db/db.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,6 @@ const postgresClient = postgres(connectionString, {
1313
connect_timeout: 30,
1414
max: 15,
1515
onnotice: () => {},
16-
// Server-side guards. lock_timeout cancels a query waiting on a row lock for
17-
// >5s (e.g. another tx holding `SELECT ... FOR UPDATE`). statement_timeout
18-
// cancels any query running >30s. Heavy paths that legitimately need longer
19-
// (table service bulk JSONB rewrites) override per-tx with `SET LOCAL`.
20-
connection: {
21-
lock_timeout: 5_000,
22-
statement_timeout: 30_000,
23-
},
2416
})
2517

2618
export const db = drizzle(postgresClient, { schema })

0 commit comments

Comments
 (0)