Skip to content

xftp-server: support postgresql backend#1755

Open
shumvgolove wants to merge 37 commits intomasterfrom
sh/xftp-pg
Open

xftp-server: support postgresql backend#1755
shumvgolove wants to merge 37 commits intomasterfrom
sh/xftp-pg

Conversation

@shumvgolove
Copy link
Copy Markdown
Collaborator

No description provided.

When a file is concurrently deleted while addRecipient runs, the FK
constraint on recipients.sender_id raises ForeignKeyViolation. Previously
this propagated as INTERNAL; now it returns AUTH (file not found).
expireServerFiles unconditionally subtracted file_size from usedStorage
for every expired file, including files that were never uploaded (no
file_path). Since reserve only increments usedStorage during upload,
expiring never-uploaded files caused usedStorage to drift negative.
setFilePath result was discarded with void. If it failed (file deleted
concurrently, or double-upload where file_path IS NULL guard rejected
the second write), the server still reported FROk, incremented stats,
and left usedStorage permanently inflated. Now the error is checked:
on failure, reserved storage is released and AUTH is returned.
The status field (e.g. "blocked,reason=spam,notice={...}") is quoted in
CSV for COPY protocol, but embedded double quotes from BlockingInfo
notice (JSON) were not escaped. This could break CSV parsing during
import. Now double quotes are escaped as "" per CSV spec.
In Postgres mode, getFile returns a snapshot TVar for fileStatus. If a
file is blocked between getFile and setFilePath, the stale status check
passes but the upload should be rejected. Added status = 'active' to
the UPDATE WHERE clause so blocked files cannot receive uploads.
Prevents negative or zero file_size values at the database level.
Without this, corrupted data from import or direct DB access could
cause incorrect storage accounting (getUsedStorage sums file_size,
and expiredFiles casts to Word32 which wraps negative values).
importFileStore now checks if the target database already contains
files and aborts with an error. Previously, importing into a non-empty
database would fail mid-COPY on duplicate primary keys, leaving the
database in a partially imported state.
When setFilePath fails (file deleted or blocked concurrently, or
duplicate upload), the uploaded file was left orphaned on disk with
no DB record pointing to it. Now the file is removed on failure,
matching the cleanup in the receiveChunk error path.
The store action result (deleteFile/blockFile) was discarded with void.
If the DB row was already deleted by a concurrent operation, the
function still decremented usedStorage, causing drift. Now the error
propagates via ExceptT, skipping the usedStorage adjustment.
deleteFile result was discarded with void. If a concurrent delete
already removed the file, deleteFile returned AUTH but usedStorage
was still decremented — causing double-decrement drift. Now the
usedStorage adjustment and filesExpired stat only run on success.
- Move STMFileStore and its FileStoreClass instance from Store/STM.hs
  back into Store.hs — the separate file was unnecessary indirection
  for the always-present default implementation.

- Parameterize xftpFileTests over store backend using HSpec SpecWith
  pattern (following SMP's serverTests approach). The same 11 tests
  now run against both memory and PostgreSQL backends via a bracket
  parameter, eliminating all *Pg test duplicates.

- Extract shared run* functions (runTestFileChunkDeliveryAddRecipients,
  runTestWrongChunkSize, runTestFileChunkExpiration, runTestFileStorageQuota)
  from inlined test bodies.
@shumvgolove shumvgolove force-pushed the sh/xftp-pg branch 2 times, most recently from eace7ab to 00ed151 Compare April 7, 2026 13:03
- Remove internal helpers from Postgres.hs export list (withDB, withDB',
  handleDuplicate, assertUpdated, withLog are not imported externally)
- Replace local isNothing_ with Data.Maybe.isNothing in Env.hs
- Consolidate duplicate/unused imports in XFTPStoreTests.hs
- Add file_path IS NULL and status guards to STM setFilePath, matching
  the Postgres implementation semantics
Remove old non-parameterized test wrapper functions that were
superseded by the store-backend-parameterized test suites.
All test bodies (run* and _ functions) are preserved and called
from the parameterized specs. Clean up unused imports.
import Text.RawString.QQ (r)

xftpSchemaMigrations :: [(String, Text, Maybe Text)]
xftpSchemaMigrations =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge them to one, no need to have before it's released

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once there is the second, there needs to be the test that generates a combined schema, but not needed now

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 6cac469

m20260402_file_size_check :: Text
m20260402_file_size_check =
[r|
ALTER TABLE files ADD CONSTRAINT check_file_size_positive CHECK (file_size > 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is int needed at all?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we usually don't do it in schema, it should be in code

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. FNEW protocol (Server.hs:491) — validates size ∈ {64KB, 256KB, 1MB, 4MB}, rejects with SIZE otherwise. Cannot produce 0 or any other invalid value.
  2. database import (Postgres.hs:245, via StoreLog.hs:62 parser) — parses size :: Word32 from the store log with strP and COPYs it straight to
    PostgreSQL. No validation at all — accepts any Word32 including 0 and any non-chunk-size value (e.g. 123).

The CHECK provides defense on the import path that the Haskell code doesn't cover.

>>= either handleDuplicate (pure . Right)
withLog "addRecipient" st $ \s -> logAddRecipients s senderId (pure $ FileRecipient rId rKey)

getFile st party fId = runExceptT $ case party of
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs refactoring similarly to how getQueue in SMP is done, so that shared part is extracted

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 26bcc72

DB.execute db "DELETE FROM recipients WHERE recipient_id = ?" (Only rId)
withLog "ackFile" st $ \s -> logAckFile s rId

expiredFiles st old limit =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are records for expired files are removed? If one by one, it's not good.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current behaviour (one-by-one, confirmed):

expireServerFiles in Server.hs:659-673 calls expiredFiles st old 10000 to get a batch of up to 10,000 expired file records, then loops over them and calls deleteFile st sId for each one (line 667).

For the PG backend, every deleteFile call goes through withDB "deleteFile" which wraps a fresh withTransaction with a DELETE FROM files WHERE sender_id = ?. So a batch of 10,000 expired files = 10,000 PostgreSQL transactions = 10,000 commits + round trips. Each transaction also fires the ON DELETE CASCADE to recipients.

Options, in order of invasiveness:

  1. Add deleteFiles :: s -> [SenderId] -> IO (Either XFTPErrorType ()) to FileStoreClass (with a default impl that loops deleteFile for STM). PG impl would run a single DELETE FROM files WHERE sender_id = ANY(?) in one transaction, and one logDeleteFile burst to the store log. Expire loop in Server.hs:661 does the filesystem removals first, collects sIds, then calls deleteFiles once.
  2. Restructure expireServerFiles only: gather all sIds upfront (still via expiredFiles batch), do fs removals, then issue one batched delete. Same shape as option 1 but keeps deleteFile as-is.


getUsedStorage st =
withTransaction (dbStore st) $ \db -> do
[Only total] <- DB.query_ db "SELECT COALESCE(SUM(file_size::INT8), 0)::INT8 FROM files"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why INT8 and not just INTEGER?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because file_size is INT4 and a SUM over many INT4 values can overflow the 2^31-1 (~2.1 GB) INT4 range - summing even ~500 × 4 MB chunks already exceeds it. Casting each row to INT8 before summing and the final SUM to INT8 gives a 63-bit accumulator, matching the Haskell Int64 return type

[r|
CREATE TABLE files (
sender_id BYTEA NOT NULL PRIMARY KEY,
file_size INT4 NOT NULL,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why int4 and not just integer?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're the same - INT4 is PostgreSQL's internal name for the 4-byte integer type; INTEGER/INT are SQL standard aliases for the exact same type. No semantic difference.

file_digest BYTEA NOT NULL,
sender_key BYTEA NOT NULL,
file_path TEXT,
created_at INT8 NOT NULL,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INTEGER

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created_at stores a Unix timestamp in seconds as Int64 on the Haskell side (RoundedFileTime = RoundedSystemTime 3600, backed by Int64). INTEGER/INT4 is 32-bit and overflows in year 2038 (2^31 seconds since epoch ≈ 2038-01-19). INT8 is 64-bit and has no practical overflow. This is the standard Y2038-safe timestamp encoding.

sl <- openWriteStoreLog False storeLogFilePath
putStrLn "Exporting files..."
-- Load all recipients into a map for lookup
rcpMap <- withTransaction (dbStore pgStore) $ \db ->
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how it can work. You are exporting recipients before you are exporting actual files. When server imports files it would just drop recipients for which it has no files (= all of them)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rcpMap loaded upfront (Postgres.hs:277-285) is only an in-memory Data.Map used for O(1) lookup - it is not written to the store log.

The actual store log write order is per-file, inside the single fold at lines 287-305:

  1. logAddFile → FNEW sId (line 296)
  2. logAddRecipients → FADD sId (line 299)
  3. logPutFile → FPUT sId (line 300)

So each file's FADD is written immediately after its own FNEW. On import, replay sees FNEW before FADD for every sId and addRecipient succeeds.

M.empty
(\acc (rId, sId, rKeyBs :: ByteString) ->
case C.decodePubKey rKeyBs of
Right rKey -> pure $! M.insertWith (++) sId [FileRecipient rId rKey] acc
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't think we need to construct it in memory - do we do it for SMP? I thought we just stream lines to a file. Maybe that's why it works, but it's unnecessary wasteful.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SMP streams directly via foldQueueRecs (Main.hs:735), writing each QueueRec straight to the store log with no in-memory map. It can do this because each row is self-contained.

XFTP can't use that exact pattern because the log groups recipients per file (FADD sId [rcp,...]) and files/recipients are in two tables — so the current code preloads rcpMap to avoid N+1 lookups. Correct, but yes, wasteful — it materializes every recipient in memory.

Two streaming alternatives:

  1. Two folds, relaxed log order: fold files → write FNEW+FPUT; then fold recipients ORDER BY sender_id, buffering the current sender's list and flushing FADD on sender change. Replay works because all FNEWs are processed before any FADD.
  2. Single joined stream: SELECT f.*, r.recipient_id, r.recipient_key FROM files f LEFT JOIN recipients r USING (sender_id) ORDER BY f.sender_id, buffering per sender and flushing FNEW+FADD+FPUT on sender boundary. One query, constant memory, identical log layout to today.

data XFTPStoreConfig s where
XSCMemory :: Maybe FilePath -> XFTPStoreConfig STMFileStore
#if defined(dbServerPostgres)
XSCDatabase :: PostgresFileStoreCfg -> XFTPStoreConfig PostgresFileStore
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen if server with DB in INI started with exe compiled without postgres? will it correctly exit with error?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Env.hs:183 handles it in the #else branch with error "Error: server binary is compiled without support for PostgreSQL database...". It exits non-zero with a clear message, so the check works.

#endif

-- | Import StoreLog to PostgreSQL database.
importToDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO ()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this belongs in Main.hs I think

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current placement is deliberate - comment at Env.hs:165 says "CPP guards for Postgres are handled here so Main.hs stays CPP-free." Main.hs currently has zero CPP.

Moving importToDatabase/exportFromDatabase to Main.hs would drag #if defined(dbServerPostgres) ... #else ... #endif blocks plus the Postgres.Config/importFileStore imports into Main.hs, same as runWithStoreConfig and checkFileStoreMode.

#endif

-- | Export PostgreSQL database to StoreLog.
exportFromDatabase :: FilePath -> Ini -> MigrationConfirmation -> IO ()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this too?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason. exportFromDatabase sits alongside importToDatabase under the same CPP guard in Env.hs for the same reason: concentrate all #if defined(dbServerPostgres) in one file so Main.hs stays CPP-free. Same tradeoff applies.

writeTVar recipientIds $! S.insert rId rIds
TM.insert rId (senderId, rKey) recipients
setFilePath st sId fPath = atomically $
withSTMFile st sId $ \FileRec {filePath, fileStatus} -> do
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withFile -> withSTMFile is a spurious rename, it increases diff

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 3f81291

TM.delete rId recipients
modifyTVar' recipientIds $ S.delete rId

getFile :: FileStore -> SFileParty p -> XFTPFileId -> STM (Either XFTPErrorType (FileRec, C.APublicAuthKey))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFile moved, get it back here

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 1f8bd6b


runXFTPServer :: XFTPServerConfig -> IO ()
runXFTPServer cfg = do
runXFTPServer :: FileStoreClass s => XFTPStoreConfig s -> XFTPServerConfig -> IO ()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think XFTPServerConfig should be parameterized, and XFTPStoreConfig s should be inside it, same as with SMP

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in fcbb13e

stopServer :: M ()
stopServer :: M s ()
stopServer = do
withFileLog closeStoreLog
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this likely should be inside closeFileStore

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in e530463

verifyCmd party = do
st <- asks store
atomically $ verify =<< getFile st party fId
liftIO (getFile st party fId) >>= \case
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary inlining, restore old structure

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in fcbb13e

#endif
. describe "agent XFTP API" $ do
it "should send and receive file" $ withXFTPServer testXFTPAgentSendReceive
. describe "agent XFTP API (memory)" $ do
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduce diff, it's unnecessary

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b5055ad

xftpServerTests :: Spec
xftpServerTests =
before_ (createDirectoryIfMissing False xftpServerFiles) . after_ (removeDirectoryRecursive xftpServerFiles) $ do
describe "XFTP file chunk delivery" $ do
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get them back, too much diff

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b5055ad

import XFTPClient

-- Memory-only tests (store log persistence and SNI/CORS transport tests)
xftpServerTests :: Spec
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpecWith XFTPTestBracket, and type name is incorrect. Bracket is code, it should be named with what we pass.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b5055ad and 50c387d

Embed XFTPStoreConfig s as serverStoreCfg field, matching SMP's
ServerConfig. runXFTPServer and newXFTPServerEnv now take a single
XFTPServerConfig s. Restore verifyCmd local helper structure.
Restore xftpServerTests and xftpAgentTests bodies to match master
byte-for-byte (only type signatures change for XFTPTestBracket
parameterization); inline the runTestXXX helpers that were split
on this branch.
Move STM store log close responsibility into closeFileStore to
match PostgresFileStore, removing the asymmetry where only PG's
close was self-contained.

STMFileStore holds the log in a TVar populated by newXFTPServerEnv
after readWriteFileStore; stopServer no longer needs the explicit
withFileLog closeStoreLog call. Writes still go through XFTPEnv.storeLog
via withFileLog (unchanged).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants