Skip to content

Commit cfb19de

Browse files
committed
fix: fallback to multipart, test coverage
Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
1 parent 24951f8 commit cfb19de

5 files changed

Lines changed: 270 additions & 36 deletions

File tree

src/config.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type StorageConfigType = {
6666
storageS3InternalTracesEnabled?: boolean
6767
storageS3MaxSockets: number
6868
storageS3DisableChecksum: boolean
69+
storageS3UploadQueueSize: number
6970
storageS3Bucket: string
7071
storageS3Endpoint?: string
7172
storageS3ForcePathStyle?: boolean
@@ -364,6 +365,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
364365
10
365366
),
366367
storageS3DisableChecksum: getOptionalConfigFromEnv('STORAGE_S3_DISABLE_CHECKSUM') === 'true',
368+
storageS3UploadQueueSize:
369+
envNumber(getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_QUEUE_SIZE')) ?? 2,
367370
storageS3InternalTracesEnabled:
368371
getOptionalConfigFromEnv('STORAGE_S3_ENABLED_METRICS') === 'true',
369372
storageS3Bucket: getOptionalConfigFromEnv('STORAGE_S3_BUCKET', 'GLOBAL_S3_BUCKET'),

src/storage/backend/s3/adapter.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ import {
3535
} from './../adapter'
3636

3737
const {
38+
storageS3UploadQueueSize,
3839
tracingFeatures,
3940
storageS3MaxSockets,
4041
tracingEnabled,
4142
storageS3DisableChecksum,
4243
} = getConfig()
4344

44-
const MAX_PUT_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 // 5GB
45+
export const MAX_PUT_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 // 5GB
4546

4647
export interface S3ClientOptions {
4748
endpoint?: string
@@ -152,13 +153,17 @@ export class S3Backend implements StorageBackendAdapter {
152153
throw ERRORS.Aborted('Upload was aborted')
153154
}
154155

155-
if (typeof contentLength !== 'number') {
156-
// If content length is unknown, use streaming multipart upload which does not require buffering the entire stream in memory.
157-
return this.bufferedMultipartUpload(bucketName, key, version, body, contentType, cacheControl, signal)
158-
}
159-
160-
if (contentLength > MAX_PUT_OBJECT_SIZE) {
161-
throw ERRORS.EntityTooLarge(undefined)
156+
if (typeof contentLength !== 'number' || contentLength > MAX_PUT_OBJECT_SIZE) {
157+
// Use multipart when the length is unknown or exceeds S3's 5GB single-request limit.
158+
return this.bufferedMultipartUpload(
159+
bucketName,
160+
key,
161+
version,
162+
body,
163+
contentType,
164+
cacheControl,
165+
signal
166+
)
162167
}
163168

164169
// Use PutObject directly when content-length is known and within S3's single-object limit (5GB).
@@ -232,7 +237,7 @@ export class S3Backend implements StorageBackendAdapter {
232237

233238
const upload = new Upload({
234239
client: this.client,
235-
queueSize: 1,
240+
queueSize: storageS3UploadQueueSize,
236241
params: {
237242
Bucket: bucketName,
238243
Key: withOptionalVersion(key, version),

src/storage/uploader.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,10 @@ export async function fileUploadFromRequest(
411411
}
412412

413413
fileContentLength = getKnownRequestContentLength(request)
414+
if (typeof fileContentLength === 'number' && fileContentLength > maxFileSize) {
415+
throw ERRORS.EntityTooLarge()
416+
}
417+
414418
isTruncated = () => {
415419
// @todo more secure to get this from the stream or from s3 in the next step
416420
return typeof fileContentLength === 'number' && fileContentLength > maxFileSize

src/test/s3-adapter.test.ts

Lines changed: 206 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
'use strict'
22

3-
import { S3Client } from '@aws-sdk/client-s3'
3+
import { HeadObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
4+
import { Upload } from '@aws-sdk/lib-storage'
45
import { Readable } from 'stream'
5-
import { S3Backend } from '../storage/backend/s3/adapter'
6+
import { getConfig } from '../config'
7+
import { ErrorCode, isStorageError } from '../internal/errors'
8+
import { MAX_PUT_OBJECT_SIZE, S3Backend } from '../storage/backend/s3/adapter'
69

710
jest.mock('@aws-sdk/client-s3', () => {
811
const originalModule = jest.requireActual('@aws-sdk/client-s3')
@@ -14,17 +17,76 @@ jest.mock('@aws-sdk/client-s3', () => {
1417
}
1518
})
1619

20+
jest.mock('@aws-sdk/lib-storage', () => {
21+
const originalModule = jest.requireActual('@aws-sdk/lib-storage')
22+
return {
23+
...originalModule,
24+
Upload: jest.fn(),
25+
}
26+
})
27+
28+
type MockUploadInstance = {
29+
options: any
30+
abort: jest.Mock
31+
done: jest.Mock<Promise<any>, []>
32+
on: jest.Mock
33+
off: jest.Mock
34+
emit: (event: string, payload: unknown) => void
35+
}
36+
1737
describe('S3Backend', () => {
1838
let mockSend: jest.Mock
39+
let mockUploadDone: jest.Mock<Promise<any>, [MockUploadInstance]>
40+
let uploadInstances: MockUploadInstance[]
1941

2042
beforeEach(() => {
2143
jest.clearAllMocks()
2244
mockSend = jest.fn()
45+
mockUploadDone = jest.fn().mockResolvedValue({
46+
ETag: '"multipart-etag"',
47+
$metadata: {
48+
httpStatusCode: 200,
49+
},
50+
})
51+
uploadInstances = []
52+
2353
;(S3Client as jest.Mock).mockImplementation(() => ({
2454
send: mockSend,
2555
}))
56+
57+
;(Upload as unknown as jest.Mock).mockImplementation((options) => {
58+
const handlers = new Map<string, Set<(payload: unknown) => void>>()
59+
const instance = {} as MockUploadInstance
60+
61+
instance.options = options
62+
instance.abort = jest.fn()
63+
instance.done = jest.fn(() => mockUploadDone(instance))
64+
instance.on = jest.fn((event: string, handler: (payload: unknown) => void) => {
65+
const eventHandlers = handlers.get(event) ?? new Set()
66+
eventHandlers.add(handler)
67+
handlers.set(event, eventHandlers)
68+
return instance
69+
})
70+
instance.off = jest.fn((event: string, handler: (payload: unknown) => void) => {
71+
handlers.get(event)?.delete(handler)
72+
return instance
73+
})
74+
instance.emit = (event: string, payload: unknown) => {
75+
handlers.get(event)?.forEach((handler) => handler(payload))
76+
}
77+
78+
uploadInstances.push(instance)
79+
return instance
80+
})
2681
})
2782

83+
function createBackend() {
84+
return new S3Backend({
85+
region: 'us-east-1',
86+
endpoint: 'http://localhost:9000',
87+
})
88+
}
89+
2890
describe('getObject', () => {
2991
test('should return correct default MIME type when S3 returns no ContentType', async () => {
3092
mockSend.mockResolvedValue({
@@ -38,10 +100,7 @@ describe('S3Backend', () => {
38100
},
39101
})
40102

41-
const backend = new S3Backend({
42-
region: 'us-east-1',
43-
endpoint: 'http://localhost:9000',
44-
})
103+
const backend = createBackend()
45104

46105
const result = await backend.getObject('test-bucket', 'test-key', undefined)
47106

@@ -64,14 +123,151 @@ describe('S3Backend', () => {
64123
},
65124
})
66125

67-
const backend = new S3Backend({
68-
region: 'us-east-1',
69-
endpoint: 'http://localhost:9000',
70-
})
126+
const backend = createBackend()
71127

72128
const result = await backend.getObject('test-bucket', 'test-key', undefined)
73129

74130
expect(result.metadata.mimetype).toBe('image/png')
75131
})
76132
})
133+
134+
describe('uploadObject', () => {
135+
test('uses PutObject for known-size uploads within the single-request limit', async () => {
136+
mockSend.mockResolvedValue({
137+
ETag: '"put-etag"',
138+
$metadata: {
139+
httpStatusCode: 200,
140+
},
141+
})
142+
143+
const backend = createBackend()
144+
const result = await backend.uploadObject(
145+
'test-bucket',
146+
'test-key',
147+
undefined,
148+
Readable.from(['hello']),
149+
'text/plain',
150+
'max-age=60',
151+
undefined,
152+
5
153+
)
154+
155+
expect(mockSend).toHaveBeenCalledTimes(1)
156+
expect(mockSend.mock.calls[0][0]).toBeInstanceOf(PutObjectCommand)
157+
expect(mockSend.mock.calls[0][0].input).toMatchObject({
158+
Bucket: 'test-bucket',
159+
Key: 'test-key',
160+
ContentType: 'text/plain',
161+
CacheControl: 'max-age=60',
162+
ContentLength: 5,
163+
})
164+
expect(Upload).not.toHaveBeenCalled()
165+
expect(result).toMatchObject({
166+
httpStatusCode: 200,
167+
cacheControl: 'max-age=60',
168+
eTag: '"put-etag"',
169+
mimetype: 'text/plain',
170+
contentLength: 5,
171+
size: 5,
172+
})
173+
})
174+
175+
test('falls back to multipart upload when content length exceeds the single-request limit', async () => {
176+
const overLimit = MAX_PUT_OBJECT_SIZE + 1
177+
const lastModified = new Date('2024-01-01T00:00:00.000Z')
178+
179+
mockUploadDone.mockImplementationOnce(async (instance) => {
180+
instance.emit('httpUploadProgress', { loaded: 1 })
181+
return {
182+
ETag: '"multipart-etag"',
183+
$metadata: {
184+
httpStatusCode: 200,
185+
},
186+
}
187+
})
188+
mockSend.mockResolvedValueOnce({
189+
CacheControl: 'max-age=60',
190+
ContentType: 'text/plain',
191+
ContentLength: overLimit,
192+
ETag: '"head-etag"',
193+
LastModified: lastModified,
194+
$metadata: {
195+
httpStatusCode: 200,
196+
},
197+
})
198+
199+
const backend = createBackend()
200+
const result = await backend.uploadObject(
201+
'test-bucket',
202+
'test-key',
203+
undefined,
204+
Readable.from(['hello']),
205+
'text/plain',
206+
'max-age=60',
207+
undefined,
208+
overLimit
209+
)
210+
211+
expect(Upload).toHaveBeenCalledTimes(1)
212+
expect(uploadInstances[0].options.queueSize).toBe(getConfig().storageS3UploadQueueSize)
213+
expect(mockSend).toHaveBeenCalledTimes(1)
214+
expect(mockSend.mock.calls[0][0]).toBeInstanceOf(HeadObjectCommand)
215+
expect(result).toMatchObject({
216+
httpStatusCode: 200,
217+
cacheControl: 'max-age=60',
218+
eTag: '"head-etag"',
219+
mimetype: 'text/plain',
220+
contentLength: overLimit,
221+
size: overLimit,
222+
lastModified,
223+
})
224+
})
225+
226+
test('uses multipart upload when content length is unknown', async () => {
227+
const backend = createBackend()
228+
const result = await backend.uploadObject(
229+
'test-bucket',
230+
'test-key',
231+
undefined,
232+
Readable.from(['hello']),
233+
'text/plain',
234+
'max-age=60'
235+
)
236+
237+
expect(Upload).toHaveBeenCalledTimes(1)
238+
expect(uploadInstances[0].options.queueSize).toBe(getConfig().storageS3UploadQueueSize)
239+
expect(mockSend).not.toHaveBeenCalled()
240+
expect(result).toMatchObject({
241+
httpStatusCode: 200,
242+
cacheControl: 'max-age=60',
243+
eTag: '"multipart-etag"',
244+
mimetype: 'text/plain',
245+
contentLength: 0,
246+
size: 0,
247+
})
248+
})
249+
250+
test('maps PutObject abort errors to AbortedTerminate', async () => {
251+
mockSend.mockRejectedValueOnce(Object.assign(new Error('aborted'), { name: 'AbortError' }))
252+
253+
const backend = createBackend()
254+
255+
try {
256+
await backend.uploadObject(
257+
'test-bucket',
258+
'test-key',
259+
undefined,
260+
Readable.from(['hello']),
261+
'text/plain',
262+
'max-age=60',
263+
undefined,
264+
5
265+
)
266+
throw new Error('Expected uploadObject to throw')
267+
} catch (error) {
268+
expect(isStorageError(ErrorCode.AbortedTerminate, error)).toBe(true)
269+
expect((error as Error).message).toBe('Upload was aborted')
270+
}
271+
})
272+
})
77273
})

0 commit comments

Comments
 (0)