fix(arrow/ipc): make Writer and FileWriter safe for concurrent use#853
Open
zeroshade wants to merge 3 commits into
Open
fix(arrow/ipc): make Writer and FileWriter safe for concurrent use#853zeroshade wants to merge 3 commits into
zeroshade wants to merge 3 commits into
Conversation
The IPC stream Writer and FileWriter share mutable state across Write and Close -- the started/headerStarted flags, the lastWrittenDicts map, the dictionary mapper, and the underlying payload writer -- with no synchronization. Concurrent Write calls could therefore write the schema/header more than once, interleave and corrupt the output stream, or panic with "concurrent map writes" (apache#55). Add a sync.Mutex to each writer, held for the duration of the exported mutating methods (Write, Close); the unexported start/checkStarted helpers run under that lock, so they remain unlocked to avoid self-deadlock. Records are still written in whatever order callers acquire the lock -- the guarantee is freedom from data races and corruption, matching how the standard library documents gob.Encoder and log.Logger. Adds -race regression tests (TestWriterConcurrentWrite, TestFileWriterConcurrentWrite) that report data races without the lock.
Now that Write and Close are serialized by the mutex, a Write that is ordered after Close must not write past the finalized output. Previously a FileWriter.Write losing the race to Close would still append a record batch after the file footer and return nil -- silently corrupting the file -- and the stream Writer would nil-deref its payload writer and surface a confusing recovered-panic error. Write now returns errClosedWriter when the stream writer's payload writer has been closed (pw == nil) or the FileWriter's footer has been written, and the godoc for both documents the rejection. Adds TestWriteAfterClose for both writers (the FileWriter case also asserts no bytes are appended after Close).
A Writer or FileWriter created without WithSchema and closed before the first Write reached start() with a nil schema and panicked in Mapper.ImportSchema (schema.NumFields() on a nil *arrow.Schema). With the new concurrent Write/Close contract this is reachable when Close wins the mutex before the first schema-inferring Write. start() now returns errNoSchema instead of panicking; Write is unaffected because it sets the schema from the record before calling start(). Adds TestCloseWithoutSchema covering both the stream Writer and FileWriter.
lidavidm
reviewed
Jun 12, 2026
lidavidm
left a comment
Member
There was a problem hiding this comment.
Why do we want it to be concurrent though? Wouldn't a channel + goroutine make more sense?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
Closes #55.
The IPC stream
WriterandFileWriterare not safe for concurrent use. Both share mutable state acrossWrite/Closewith no synchronization:started/headerStartedflags (checked-then-set inWrite/Close→ schema/header can be written more than once),lastWrittenDictsmap (written fromwriteDictionaryPayloads→ Gofatal error: concurrent map writes),mapper, the sharedcompressorsslice, and the underlyingPayloadWriter(concurrentWritePayload→ interleaved/corrupted stream),Closesettingpw = nilwhile aWriteis in flight → nil-deref / use-after-close.As reported, calling
Writefrom multiple goroutines on one writer can write the schema multiple times and crash the sink. Running the new tests under-raceagainst the old code reports many data races.What changes are included in this PR?
sync.MutextoWriterandFileWriter, held for the duration of the exported mutating methodsWriteandClose.start/checkStartedhelpers are only ever called fromWrite/Close, so they intentionally stay unlocked to avoid self-deadlock.Write/Close. Calls are serialized; records are written in the order in which callers acquire the lock. The guarantee is freedom from data races and stream corruption — not a particular record ordering — which matches how the standard library documents the mutex-protectedgob.Encoderandlog.Logger.No public signatures change; this is additive (a new unexported field + locking + godoc).
Are these changes tested?
Yes. Two new
-raceregression tests inarrow/ipc/writer_test.go:TestWriterConcurrentWrite— 16 goroutinesWriteto one streamWriter, then the stream is read back and asserted to contain exactly one schema message + 16 record batches.TestFileWriterConcurrentWrite— same forFileWriter, assertingNumRecords() == 16on read-back.Verified that both fail with data-race warnings on the pre-change code and pass with the lock. The full
arrow/ipc/...suite passes under-race(no deadlock/regression), andgo vetis clean.Are there any user-facing changes?
No breaking changes.
Write/Closekeep their signatures; concurrent callers now get defined, race-free behavior instead of corruption/panics, and single-threaded callers are unaffected (an uncontended mutex).Design note
Apache Arrow C++ (
IpcFormatWriter) and Java (ArrowWriter) treat IPC writers as single-threaded, and most Go stdlib stream writers (bufio,csv,json,gzip) leave synchronization to the caller. I went with the mutex (rather than only documenting non-safety) because the reported failure mode is a hardconcurrent map writescrash, the lock is free when uncontended, and arrow-go already documents concurrency guarantees elsewhere (e.g.FileReader.RecordBatchAt/RecordAt). Happy to switch to a docs-only "caller must synchronize" approach if maintainers prefer matching C++/Java exactly.