xftp-server: support postgresql backend#1755
Conversation
When a file is concurrently deleted while addRecipient runs, the FK constraint on recipients.sender_id raises ForeignKeyViolation. Previously this propagated as INTERNAL; now it returns AUTH (file not found).
expireServerFiles unconditionally subtracted file_size from usedStorage for every expired file, including files that were never uploaded (no file_path). Since reserve only increments usedStorage during upload, expiring never-uploaded files caused usedStorage to drift negative.
setFilePath result was discarded with void. If it failed (file deleted concurrently, or double-upload where file_path IS NULL guard rejected the second write), the server still reported FROk, incremented stats, and left usedStorage permanently inflated. Now the error is checked: on failure, reserved storage is released and AUTH is returned.
The status field (e.g. "blocked,reason=spam,notice={...}") is quoted in
CSV for COPY protocol, but embedded double quotes from BlockingInfo
notice (JSON) were not escaped. This could break CSV parsing during
import. Now double quotes are escaped as "" per CSV spec.
In Postgres mode, getFile returns a snapshot TVar for fileStatus. If a file is blocked between getFile and setFilePath, the stale status check passes but the upload should be rejected. Added status = 'active' to the UPDATE WHERE clause so blocked files cannot receive uploads.
Prevents negative or zero file_size values at the database level. Without this, corrupted data from import or direct DB access could cause incorrect storage accounting (getUsedStorage sums file_size, and expiredFiles casts to Word32 which wraps negative values).
importFileStore now checks if the target database already contains files and aborts with an error. Previously, importing into a non-empty database would fail mid-COPY on duplicate primary keys, leaving the database in a partially imported state.
When setFilePath fails (file deleted or blocked concurrently, or duplicate upload), the uploaded file was left orphaned on disk with no DB record pointing to it. Now the file is removed on failure, matching the cleanup in the receiveChunk error path.
The store action result (deleteFile/blockFile) was discarded with void. If the DB row was already deleted by a concurrent operation, the function still decremented usedStorage, causing drift. Now the error propagates via ExceptT, skipping the usedStorage adjustment.
deleteFile result was discarded with void. If a concurrent delete already removed the file, deleteFile returned AUTH but usedStorage was still decremented — causing double-decrement drift. Now the usedStorage adjustment and filesExpired stat only run on success.
- Move STMFileStore and its FileStoreClass instance from Store/STM.hs back into Store.hs — the separate file was unnecessary indirection for the always-present default implementation. - Parameterize xftpFileTests over store backend using HSpec SpecWith pattern (following SMP's serverTests approach). The same 11 tests now run against both memory and PostgreSQL backends via a bracket parameter, eliminating all *Pg test duplicates. - Extract shared run* functions (runTestFileChunkDeliveryAddRecipients, runTestWrongChunkSize, runTestFileChunkExpiration, runTestFileStorageQuota) from inlined test bodies.
eace7ab to
00ed151
Compare
- Remove internal helpers from Postgres.hs export list (withDB, withDB', handleDuplicate, assertUpdated, withLog are not imported externally) - Replace local isNothing_ with Data.Maybe.isNothing in Env.hs - Consolidate duplicate/unused imports in XFTPStoreTests.hs - Add file_path IS NULL and status guards to STM setFilePath, matching the Postgres implementation semantics
Remove old non-parameterized test wrapper functions that were superseded by the store-backend-parameterized test suites. All test bodies (run* and _ functions) are preserved and called from the parameterized specs. Clean up unused imports.
| import Text.RawString.QQ (r) | ||
|
|
||
| xftpSchemaMigrations :: [(String, Text, Maybe Text)] | ||
| xftpSchemaMigrations = |
There was a problem hiding this comment.
merge them to one, no need to have before it's released
There was a problem hiding this comment.
once there is the second, there needs to be the test that generates a combined schema, but not needed now
| m20260402_file_size_check :: Text | ||
| m20260402_file_size_check = | ||
| [r| | ||
| ALTER TABLE files ADD CONSTRAINT check_file_size_positive CHECK (file_size > 0); |
There was a problem hiding this comment.
we usually don't do it in schema, it should be in code
There was a problem hiding this comment.
- FNEW protocol (Server.hs:491) — validates size ∈ {64KB, 256KB, 1MB, 4MB}, rejects with SIZE otherwise. Cannot produce 0 or any other invalid value.
- database import (Postgres.hs:245, via StoreLog.hs:62 parser) — parses size :: Word32 from the store log with strP and COPYs it straight to
PostgreSQL. No validation at all — accepts any Word32 including 0 and any non-chunk-size value (e.g. 123).
The CHECK provides defense on the import path that the Haskell code doesn't cover.
| >>= either handleDuplicate (pure . Right) | ||
| withLog "addRecipient" st $ \s -> logAddRecipients s senderId (pure $ FileRecipient rId rKey) | ||
|
|
||
| getFile st party fId = runExceptT $ case party of |
There was a problem hiding this comment.
this needs refactoring similarly to how getQueue in SMP is done, so that shared part is extracted
| DB.execute db "DELETE FROM recipients WHERE recipient_id = ?" (Only rId) | ||
| withLog "ackFile" st $ \s -> logAckFile s rId | ||
|
|
||
| expiredFiles st old limit = |
There was a problem hiding this comment.
how are records for expired files are removed? If one by one, it's not good.
There was a problem hiding this comment.
Current behaviour (one-by-one, confirmed):
expireServerFiles in Server.hs:659-673 calls expiredFiles st old 10000 to get a batch of up to 10,000 expired file records, then loops over them and calls deleteFile st sId for each one (line 667).
For the PG backend, every deleteFile call goes through withDB "deleteFile" which wraps a fresh withTransaction with a DELETE FROM files WHERE sender_id = ?. So a batch of 10,000 expired files = 10,000 PostgreSQL transactions = 10,000 commits + round trips. Each transaction also fires the ON DELETE CASCADE to recipients.
Options, in order of invasiveness:
- Add deleteFiles :: s -> [SenderId] -> IO (Either XFTPErrorType ()) to FileStoreClass (with a default impl that loops deleteFile for STM). PG impl would run a single DELETE FROM files WHERE sender_id = ANY(?) in one transaction, and one logDeleteFile burst to the store log. Expire loop in Server.hs:661 does the filesystem removals first, collects sIds, then calls deleteFiles once.
- Restructure expireServerFiles only: gather all sIds upfront (still via expiredFiles batch), do fs removals, then issue one batched delete. Same shape as option 1 but keeps deleteFile as-is.
|
|
||
| getUsedStorage st = | ||
| withTransaction (dbStore st) $ \db -> do | ||
| [Only total] <- DB.query_ db "SELECT COALESCE(SUM(file_size::INT8), 0)::INT8 FROM files" |
There was a problem hiding this comment.
why INT8 and not just INTEGER?
There was a problem hiding this comment.
Because file_size is INT4 and a SUM over many INT4 values can overflow the 2^31-1 (~2.1 GB) INT4 range - summing even ~500 × 4 MB chunks already exceeds it. Casting each row to INT8 before summing and the final SUM to INT8 gives a 63-bit accumulator, matching the Haskell Int64 return type
| [r| | ||
| CREATE TABLE files ( | ||
| sender_id BYTEA NOT NULL PRIMARY KEY, | ||
| file_size INT4 NOT NULL, |
There was a problem hiding this comment.
why int4 and not just integer?
There was a problem hiding this comment.
They're the same - INT4 is PostgreSQL's internal name for the 4-byte integer type; INTEGER/INT are SQL standard aliases for the exact same type. No semantic difference.
| file_digest BYTEA NOT NULL, | ||
| sender_key BYTEA NOT NULL, | ||
| file_path TEXT, | ||
| created_at INT8 NOT NULL, |
There was a problem hiding this comment.
created_at stores a Unix timestamp in seconds as Int64 on the Haskell side (RoundedFileTime = RoundedSystemTime 3600, backed by Int64). INTEGER/INT4 is 32-bit and overflows in year 2038 (2^31 seconds since epoch ≈ 2038-01-19). INT8 is 64-bit and has no practical overflow. This is the standard Y2038-safe timestamp encoding.
| sl <- openWriteStoreLog False storeLogFilePath | ||
| putStrLn "Exporting files..." | ||
| -- Load all recipients into a map for lookup | ||
| rcpMap <- withTransaction (dbStore pgStore) $ \db -> |
There was a problem hiding this comment.
I don't see how it can work. You are exporting recipients before you are exporting actual files. When server imports files it would just drop recipients for which it has no files (= all of them)
There was a problem hiding this comment.
The rcpMap loaded upfront (Postgres.hs:277-285) is only an in-memory Data.Map used for O(1) lookup - it is not written to the store log.
The actual store log write order is per-file, inside the single fold at lines 287-305:
- logAddFile → FNEW sId (line 296)
- logAddRecipients → FADD sId (line 299)
- logPutFile → FPUT sId (line 300)
So each file's FADD is written immediately after its own FNEW. On import, replay sees FNEW before FADD for every sId and addRecipient succeeds.
| M.empty | ||
| (\acc (rId, sId, rKeyBs :: ByteString) -> | ||
| case C.decodePubKey rKeyBs of | ||
| Right rKey -> pure $! M.insertWith (++) sId [FileRecipient rId rKey] acc |
There was a problem hiding this comment.
I also don't think we need to construct it in memory - do we do it for SMP? I thought we just stream lines to a file. Maybe that's why it works, but it's unnecessary wasteful.
There was a problem hiding this comment.
SMP streams directly via foldQueueRecs (Main.hs:735), writing each QueueRec straight to the store log with no in-memory map. It can do this because each row is self-contained.
XFTP can't use that exact pattern because the log groups recipients per file (FADD sId [rcp,...]) and files/recipients are in two tables — so the current code preloads rcpMap to avoid N+1 lookups. Correct, but yes, wasteful — it materializes every recipient in memory.
Two streaming alternatives:
- Two folds, relaxed log order: fold files → write FNEW+FPUT; then fold recipients ORDER BY sender_id, buffering the current sender's list and flushing FADD on sender change. Replay works because all FNEWs are processed before any FADD.
- Single joined stream: SELECT f.*, r.recipient_id, r.recipient_key FROM files f LEFT JOIN recipients r USING (sender_id) ORDER BY f.sender_id, buffering per sender and flushing FNEW+FADD+FPUT on sender boundary. One query, constant memory, identical log layout to today.
| data XFTPStoreConfig s where | ||
| XSCMemory :: Maybe FilePath -> XFTPStoreConfig STMFileStore | ||
| #if defined(dbServerPostgres) | ||
| XSCDatabase :: PostgresFileStoreCfg -> XFTPStoreConfig PostgresFileStore |
There was a problem hiding this comment.
what would happen if server with DB in INI started with exe compiled without postgres? will it correctly exit with error?
There was a problem hiding this comment.
Env.hs:183 handles it in the #else branch with error "Error: server binary is compiled without support for PostgreSQL database...". It exits non-zero with a clear message, so the check works.
| #endif | ||
|
|
||
| -- | Import StoreLog to PostgreSQL database. | ||
| importToDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO () |
There was a problem hiding this comment.
this belongs in Main.hs I think
There was a problem hiding this comment.
The current placement is deliberate - comment at Env.hs:165 says "CPP guards for Postgres are handled here so Main.hs stays CPP-free." Main.hs currently has zero CPP.
Moving importToDatabase/exportFromDatabase to Main.hs would drag #if defined(dbServerPostgres) ... #else ... #endif blocks plus the Postgres.Config/importFileStore imports into Main.hs, same as runWithStoreConfig and checkFileStoreMode.
| #endif | ||
|
|
||
| -- | Export PostgreSQL database to StoreLog. | ||
| exportFromDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO () |
There was a problem hiding this comment.
Same reason. exportFromDatabase sits alongside importToDatabase under the same CPP guard in Env.hs for the same reason: concentrate all #if defined(dbServerPostgres) in one file so Main.hs stays CPP-free. Same tradeoff applies.
| writeTVar recipientIds $! S.insert rId rIds | ||
| TM.insert rId (senderId, rKey) recipients | ||
| setFilePath st sId fPath = atomically $ | ||
| withSTMFile st sId $ \FileRec {filePath, fileStatus} -> do |
There was a problem hiding this comment.
withFile -> withSTMFile is a spurious rename, it increases diff
| TM.delete rId recipients | ||
| modifyTVar' recipientIds $ S.delete rId | ||
|
|
||
| getFile :: FileStore -> SFileParty p -> XFTPFileId -> STM (Either XFTPErrorType (FileRec, C.APublicAuthKey)) |
There was a problem hiding this comment.
getFile moved, get it back here
src/Simplex/FileTransfer/Server.hs
Outdated
|
|
||
| runXFTPServer :: XFTPServerConfig -> IO () | ||
| runXFTPServer cfg = do | ||
| runXFTPServer :: FileStoreClass s => XFTPStoreConfig s -> XFTPServerConfig -> IO () |
There was a problem hiding this comment.
I think XFTPServerConfig should be parameterized, and XFTPStoreConfig s should be inside it, same as with SMP
src/Simplex/FileTransfer/Server.hs
Outdated
| stopServer :: M () | ||
| stopServer :: M s () | ||
| stopServer = do | ||
| withFileLog closeStoreLog |
There was a problem hiding this comment.
this likely should be inside closeFileStore
src/Simplex/FileTransfer/Server.hs
Outdated
| verifyCmd party = do | ||
| st <- asks store | ||
| atomically $ verify =<< getFile st party fId | ||
| liftIO (getFile st party fId) >>= \case |
There was a problem hiding this comment.
unnecessary inlining, restore old structure
tests/XFTPAgent.hs
Outdated
| #endif | ||
| . describe "agent XFTP API" $ do | ||
| it "should send and receive file" $ withXFTPServer testXFTPAgentSendReceive | ||
| . describe "agent XFTP API (memory)" $ do |
There was a problem hiding this comment.
reduce diff, it's unnecessary
| xftpServerTests :: Spec | ||
| xftpServerTests = | ||
| before_ (createDirectoryIfMissing False xftpServerFiles) . after_ (removeDirectoryRecursive xftpServerFiles) $ do | ||
| describe "XFTP file chunk delivery" $ do |
There was a problem hiding this comment.
get them back, too much diff
tests/XFTPServerTests.hs
Outdated
| import XFTPClient | ||
|
|
||
| -- Memory-only tests (store log persistence and SNI/CORS transport tests) | ||
| xftpServerTests :: Spec |
There was a problem hiding this comment.
SpecWith XFTPTestBracket, and type name is incorrect. Bracket is code, it should be named with what we pass.
Embed XFTPStoreConfig s as serverStoreCfg field, matching SMP's ServerConfig. runXFTPServer and newXFTPServerEnv now take a single XFTPServerConfig s. Restore verifyCmd local helper structure.
Restore xftpServerTests and xftpAgentTests bodies to match master byte-for-byte (only type signatures change for XFTPTestBracket parameterization); inline the runTestXXX helpers that were split on this branch.
Move STM store log close responsibility into closeFileStore to match PostgresFileStore, removing the asymmetry where only PG's close was self-contained. STMFileStore holds the log in a TVar populated by newXFTPServerEnv after readWriteFileStore; stopServer no longer needs the explicit withFileLog closeStoreLog call. Writes still go through XFTPEnv.storeLog via withFileLog (unchanged).
No description provided.