diff --git a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs
index 0c5eaa4..1273948 100644
--- a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs
+++ b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs
@@ -17,6 +17,9 @@ internal sealed class S3UploadStream : Stream
{
private const long PartSize = 5 * 1024 * 1024;
+ // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
+ private const long MaxPartSize = 5L * 1024 * 1024 * 1024;
+
private readonly IAmazonS3 _client;
private readonly string _bucketName;
private readonly string _key;
@@ -146,16 +149,11 @@ public override void SetLength(long value) =>
///
public override void Flush()
{
- _stream.Flush();
- UploadPart();
}
///
- public override async Task FlushAsync(CancellationToken cancellationToken)
- {
- await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
- await UploadPartAsync(cancellationToken).ConfigureAwait(false);
- }
+ public override Task FlushAsync(CancellationToken cancellationToken) =>
+ Task.CompletedTask;
///
protected override void Dispose(bool disposing)
@@ -232,24 +230,28 @@ private async ValueTask UploadPartAsync(CancellationToken cancellationToken)
{
_stream.Position = 0;
- // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
- // The maximum allowed part size is 5 gigabytes.
-
- var request = new UploadPartRequest
+ do
{
- BucketName = _bucketName,
- Key = _key,
- UploadId = _uploadId,
- PartNumber = _partETags.Count + 1,
- InputStream = _stream,
- PartSize = _stream.Length
- };
-
- var response = await _client
- .UploadPartAsync(request, cancellationToken)
- .ConfigureAwait(false);
-
- _partETags.Add(new PartETag(response));
+ var remaining = _stream.Length - _stream.Position;
+ var partSize = Math.Min(remaining, MaxPartSize);
+
+ var request = new UploadPartRequest
+ {
+ BucketName = _bucketName,
+ Key = _key,
+ UploadId = _uploadId,
+ PartNumber = _partETags.Count + 1,
+ InputStream = _stream,
+ PartSize = partSize
+ };
+
+ var response = await _client
+ .UploadPartAsync(request, cancellationToken)
+ .ConfigureAwait(false);
+
+ _partETags.Add(new PartETag(response));
+ }
+ while (_stream.Position < _stream.Length);
_stream.Position = 0;
_stream.SetLength(0);
diff --git a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs
index fc75b3b..706e889 100644
--- a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs
+++ b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs
@@ -60,10 +60,8 @@ public async Task File_OpenWrite_InternalBufferWriteError_DoesNotCreateFile()
var underlying = (FileStream)stream.GetType().GetField("_stream", BindingFlags.NonPublic | BindingFlags.Instance)!.GetValue(stream)!;
Assert.That(underlying, Is.Not.Null);
- await stream.WriteAsync(new ReadOnlyMemory(new byte[1024]));
-
- // Forces to upload buffer.
- await stream.FlushAsync();
+ // Write enough data to trigger automatic part upload (>= 5 MiB).
+ await stream.WriteAsync(new ReadOnlyMemory(new byte[6 * 1024 * 1024]));
// Simulates an internal buffer write error.
await underlying.DisposeAsync();
@@ -196,6 +194,83 @@ await reader.ReadToEndAsync(),
}
+ [Test]
+ public async Task File_OpenWrite_FlushDoesNotCauseUndersizedParts()
+ {
+ using var fs = GetFileSystem();
+
+ var content = "Hello, World!";
+
+ {
+ await using var stream = await fs.OpenWriteAsync("/flush-test.txt");
+ await using var writer = new StreamWriter(stream);
+
+ // Write small data and flush multiple times.
+ // Flush should be a no-op and not upload undersized parts.
+ await writer.WriteAsync(content[..5]);
+ await writer.FlushAsync();
+ await writer.WriteAsync(content[5..]);
+ await writer.FlushAsync();
+ }
+
+ {
+ var file = fs.GetFile("/flush-test.txt");
+ Assert.That(await file.ExistsAsync(), Is.True);
+
+ // ReSharper disable once UseAwaitUsing
+ using var stream = await file.OpenReadAsync();
+ using var reader = new StreamReader(stream);
+ Assert.That(await reader.ReadToEndAsync(), Is.EqualTo(content));
+
+ await file.DeleteAsync();
+ }
+ }
+
+ [Test]
+ public async Task File_OpenWrite_FlushWithMultipartUpload()
+ {
+ using var fs = GetFileSystem();
+
+ // Write more than 5 MiB to trigger multipart upload,
+ // with Flush calls between writes.
+ var chunk = new byte[2 * 1024 * 1024];
+ Random.Shared.NextBytes(chunk);
+
+ {
+ await using var stream = await fs.OpenWriteAsync("/flush-multipart-test.bin");
+
+ // Write 4 chunks (8 MiB total) with flushes in between.
+ // Without the fix, each flush would upload an undersized part
+ // and CompleteMultipartUpload would fail.
+ for (var i = 0; i < 4; i++)
+ {
+ await stream.WriteAsync(chunk);
+ await stream.FlushAsync();
+ }
+ }
+
+ var file = fs.GetFile("/flush-multipart-test.bin");
+ Assert.That(await file.ExistsAsync(), Is.True);
+ Assert.That(await file.GetLengthAsync(), Is.EqualTo(chunk.Length * 4));
+
+ await file.DeleteAsync();
+ }
+
+ [Test]
+ public async Task File_OpenWrite_EmptyFileWithFlush()
+ {
+ using var fs = GetFileSystem();
+
+ await using (var stream = await fs.OpenWriteAsync("/empty-flush-test.txt"))
+ await stream.FlushAsync();
+
+ var file = fs.GetFile("/empty-flush-test.txt");
+ Assert.That(await file.ExistsAsync(), Is.True);
+ Assert.That(await file.GetLengthAsync(), Is.EqualTo(0));
+
+ await file.DeleteAsync();
+ }
+
[Test]
public async Task Directory_BatchDeleting()
{