Skip to content
This repository was archived by the owner on Mar 19, 2026. It is now read-only.

Commit ae977e8

Browse files
committed
Add AWS S3 storage backend
1 parent f711a85 commit ae977e8

13 files changed

Lines changed: 1602 additions & 59 deletions

packages/backfill/README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,52 @@ BACKFILL_CACHE_PROVIDER="azure-blob"
253253
BACKFILL_CACHE_PROVIDER_OPTIONS='{"connectionString":"...","container":"..."}'
254254
```
255255

256+
### AWS Simple Storage Service (S3)
257+
258+
To cache to a AWS S3 you need to need to supply the required aws-sdk libraries:
259+
260+
yarn add -D @aws-sdk/client-s3 @aws-sdk/lib-storage
261+
262+
You will have to configure backfill and provide a bucket name. If you are configuring
263+
via `backfill.config.js`, you can use the following syntax:
264+
265+
```js
266+
module.exports = {
267+
cacheStorageConfig: {
268+
provider: "s3",
269+
options: {
270+
bucket: "...",
271+
maxSize: 12345,
272+
},
273+
},
274+
};
275+
```
276+
277+
Via environment variables:
278+
279+
```
280+
BACKFILL_CACHE_PROVIDER="s3"
281+
BACKFILL_CACHE_PROVIDER_OPTIONS='{"bucket":"...","prefix":"...",maxSize:50000000}'
282+
AWS_PROFILE=...
283+
AWS_REGION=...
284+
```
285+
286+
#### Options
287+
288+
<dl>
289+
<dt>bucket</dt>
290+
<dd>the name of the AWS bucket to store files in</dd>
291+
292+
<dt>prefix</dt>
293+
<dd>optional prefix to add to keys. For example: "build-cache/"</dd>
294+
295+
<dt>maxSize (<em>optional</em>)</dt>
296+
<dd>
297+
max size of a single package cache, in the number of bytes
298+
</dd>
299+
</dl>
300+
301+
256302
### NPM package
257303

258304
To cache to an NPM package you need to provide a package name and the registry

packages/backfill/package.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@
4444
"ts-mockito": "^2.6.1",
4545
"typescript": "~4.7.0"
4646
},
47+
"peerDependencies": {
48+
"@aws-sdk/client-s3": "^3.804.0",
49+
"@aws-sdk/lib-storage": "^3.804.0"
50+
},
51+
"peerDependenciesMeta": {
52+
"@aws-sdk/client-s3": {
53+
"optional": true
54+
},
55+
"@aws-sdk/lib-storage": {
56+
"optional": true
57+
}
58+
},
4759
"engines": {
4860
"node": ">=14"
4961
}

packages/cache/package.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
"tar-fs": "^2.1.0"
2727
},
2828
"devDependencies": {
29+
"@aws-sdk/client-s3": "^3.804.0",
30+
"@aws-sdk/lib-storage": "^3.804.0",
2931
"@types/fs-extra": "^9.0.13",
3032
"@types/jest": "^30.0.0",
3133
"@types/node": "^14.18.36",
@@ -36,6 +38,18 @@
3638
"ts-jest": "^29.0.0",
3739
"typescript": "~4.7.0"
3840
},
41+
"peerDependencies": {
42+
"@aws-sdk/client-s3": "^3.804.0",
43+
"@aws-sdk/lib-storage": "^3.804.0"
44+
},
45+
"peerDependenciesMeta": {
46+
"@aws-sdk/client-s3": {
47+
"optional": true
48+
},
49+
"@aws-sdk/lib-storage": {
50+
"optional": true
51+
}
52+
},
3953
"engines": {
4054
"node": ">=14"
4155
}

packages/cache/src/AzureBlobCacheStorage.ts

Lines changed: 3 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as path from "path";
2-
import { Transform, TransformCallback, pipeline } from "stream";
2+
import { pipeline } from "stream";
33
import tarFs from "tar-fs";
44

55
import { Logger } from "backfill-logger";
@@ -8,62 +8,12 @@ import { AzureBlobCacheStorageOptions } from "backfill-config";
88
import { stat } from "fs-extra";
99
import { ContainerClient } from "@azure/storage-blob";
1010
import { CacheStorage } from "./CacheStorage";
11+
import { TimeoutStream } from "./TimeoutStream";
12+
import { SpongeStream } from "./SpongeStream";
1113

1214
const ONE_MEGABYTE = 1024 * 1024;
1315
const FOUR_MEGABYTES = 4 * ONE_MEGABYTE;
1416

15-
/*
16-
* Timeout stream, will emit an error event if the
17-
* input has not started providing data after a given time after
18-
* its creation.
19-
*/
20-
class TimeoutStream extends Transform {
21-
private timeout: NodeJS.Timeout;
22-
constructor(timeout: number, message: string) {
23-
super();
24-
this.timeout = setTimeout(() => {
25-
this.destroy(new Error(message));
26-
}, timeout);
27-
}
28-
_transform(
29-
chunk: any,
30-
_encoding: BufferEncoding,
31-
callback: TransformCallback
32-
): void {
33-
clearTimeout(this.timeout);
34-
this.push(chunk);
35-
callback();
36-
}
37-
}
38-
39-
/*
40-
* Sponge stream, it will accumulate all the data it receives
41-
* and emit it only if and when the input stream sends the "end" event.
42-
*/
43-
class SpongeStream extends Transform {
44-
constructor() {
45-
super({
46-
// This stream should never receive more data than its readableHighWaterMark
47-
// otherwise the stream will get into a deadlock
48-
// 1 TB should give enough room :)
49-
readableHighWaterMark: 1024 * 1024 * 1024 * 1024,
50-
});
51-
}
52-
_transform(
53-
chunk: any,
54-
_encoding: BufferEncoding,
55-
callback: TransformCallback
56-
): void {
57-
this.pause();
58-
this.push(chunk);
59-
callback();
60-
}
61-
_flush(callback: TransformCallback): void {
62-
this.resume();
63-
callback();
64-
}
65-
}
66-
6717
const uploadOptions = {
6818
bufferSize: FOUR_MEGABYTES,
6919
maxBuffers: 5,
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import path from "path";
2+
import { PassThrough, pipeline } from "stream";
3+
import tarFs from "tar-fs";
4+
import { Logger } from "backfill-logger";
5+
import { stat } from "fs-extra";
6+
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
7+
import { Upload } from "@aws-sdk/lib-storage";
8+
import { S3CacheStorageOptions } from "backfill-config";
9+
import { CacheStorage } from "./CacheStorage";
10+
import { TimeoutStream } from "./TimeoutStream";
11+
import { SpongeStream } from "./SpongeStream";
12+
13+
/**
14+
* Implementation of backfill storage using AWS S3. To use it,
15+
* specify a custom
16+
*/
17+
export class S3CacheStorage extends CacheStorage {
18+
private readonly s3Client: S3Client;
19+
20+
constructor(
21+
private options: S3CacheStorageOptions,
22+
logger: Logger,
23+
cwd: string,
24+
incrementalCaching = false
25+
) {
26+
super(logger, cwd, incrementalCaching);
27+
this.s3Client = new S3Client(options.clientConfig || {});
28+
}
29+
30+
protected async _fetch(hash: string): Promise<boolean> {
31+
try {
32+
const command = new GetObjectCommand({
33+
Bucket: this.options.bucket,
34+
Key: (this.options.prefix ?? "") + hash,
35+
});
36+
37+
const response = await this.s3Client.send(command);
38+
39+
if (
40+
this.options.maxSize &&
41+
response.ContentLength &&
42+
response.ContentLength > this.options.maxSize
43+
) {
44+
this.logger.verbose(
45+
`Object is too large to be downloaded: ${hash}, size: ${response.ContentLength} bytes`
46+
);
47+
return false;
48+
}
49+
50+
const objectStream = response.Body;
51+
if (!objectStream) {
52+
throw new Error("Unable to fetch object.");
53+
}
54+
55+
const tarWritableStream = tarFs.extract(this.cwd);
56+
57+
const spongeStream = new SpongeStream();
58+
59+
const timeoutStream = new TimeoutStream(
60+
10 * 60 * 1000,
61+
`The fetch request to ${hash} seems to be hanging`
62+
);
63+
64+
const extractionPipeline = new Promise<void>((resolve, reject) =>
65+
pipeline(
66+
objectStream as any,
67+
spongeStream,
68+
timeoutStream,
69+
tarWritableStream,
70+
(err) => {
71+
if (err) {
72+
reject(err);
73+
} else {
74+
resolve();
75+
}
76+
}
77+
)
78+
);
79+
80+
await extractionPipeline;
81+
return true;
82+
} catch (error) {
83+
if (error && (error as any).name === "NoSuchKey") {
84+
return false;
85+
} else {
86+
throw error;
87+
}
88+
}
89+
}
90+
91+
protected async _put(hash: string, filesToCache: string[]): Promise<void> {
92+
const tarStream = tarFs.pack(this.cwd, { entries: filesToCache });
93+
// If there's a maxSize limit, first sum up the total size of bytes of all the outputGlobbed files
94+
if (this.options.maxSize) {
95+
let total = 0;
96+
for (const file of filesToCache) {
97+
total = total + (await stat(path.join(this.cwd, file))).size;
98+
}
99+
100+
if (total > this.options.maxSize) {
101+
this.logger.verbose(
102+
`The output is too large to be uploaded: ${hash}, size: ${total} bytes`
103+
);
104+
return;
105+
}
106+
}
107+
108+
const pass = new PassThrough();
109+
tarStream.pipe(pass);
110+
111+
const upload = new Upload({
112+
client: this.s3Client,
113+
params: {
114+
Bucket: this.options.bucket,
115+
ContentType: "application/x-tar",
116+
Key: (this.options.prefix ?? "") + hash,
117+
Body: pass,
118+
},
119+
});
120+
await upload.done();
121+
}
122+
}

packages/cache/src/SpongeStream.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Transform, TransformCallback } from "stream";
2+
3+
/*
4+
* Sponge stream, it will accumulate all the data it receives
5+
* and emit it only if and when the input stream sends the "end" event.
6+
*/
7+
export class SpongeStream extends Transform {
8+
constructor() {
9+
super({
10+
// This stream should never receive more data than its readableHighWaterMark
11+
// otherwise the stream will get into a deadlock
12+
// 1 TB should give enough room :)
13+
readableHighWaterMark: 1024 * 1024 * 1024 * 1024,
14+
});
15+
}
16+
_transform(
17+
chunk: any,
18+
_encoding: BufferEncoding,
19+
callback: TransformCallback
20+
): void {
21+
this.pause();
22+
this.push(chunk);
23+
callback();
24+
}
25+
_flush(callback: TransformCallback): void {
26+
this.resume();
27+
callback();
28+
}
29+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { Transform, TransformCallback } from "stream";
2+
3+
/*
4+
* Timeout stream, will emit an error event if the
5+
* input has not started providing data after a given time after
6+
* its creation.
7+
*/
8+
export class TimeoutStream extends Transform {
9+
private timeout: NodeJS.Timeout;
10+
constructor(timeout: number, message: string) {
11+
super();
12+
this.timeout = setTimeout(() => {
13+
this.destroy(new Error(message));
14+
}, timeout);
15+
}
16+
_transform(
17+
chunk: any,
18+
_encoding: BufferEncoding,
19+
callback: TransformCallback
20+
): void {
21+
clearTimeout(this.timeout);
22+
this.push(chunk);
23+
callback();
24+
}
25+
}

packages/cache/src/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { AzureBlobCacheStorage } from "./AzureBlobCacheStorage";
66
import { LocalCacheStorage } from "./LocalCacheStorage";
77
import { NpmCacheStorage } from "./NpmCacheStorage";
88
import { LocalSkipCacheStorage } from "./LocalSkipCacheStorage";
9+
import { S3CacheStorage } from "./S3CacheStorage";
910
export { ICacheStorage, CacheStorage } from "./CacheStorage";
1011

1112
export function isCustomProvider(
@@ -54,6 +55,13 @@ export function getCacheStorageProvider(
5455
cwd,
5556
incrementalCaching
5657
);
58+
} else if (cacheStorageConfig.provider === "s3") {
59+
cacheStorage = new S3CacheStorage(
60+
cacheStorageConfig.options,
61+
logger,
62+
cwd,
63+
incrementalCaching
64+
);
5765
} else if (cacheStorageConfig.provider === "local-skip") {
5866
cacheStorage = new LocalSkipCacheStorage(
5967
internalCacheFolder,

packages/config/src/cacheConfig.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { Logger } from "backfill-logger";
22
import type { AzureBlobCacheStorageConfig } from "./azureBlobCacheConfig";
33
import type { NpmCacheStorageConfig } from "./npmCacheConfig";
4+
import type { S3CacheStorageConfig } from "./s3CacheConfig";
45

56
export interface ICacheStorage {
67
fetch: (hash: string) => Promise<boolean>;
@@ -21,6 +22,7 @@ export type CacheStorageConfig =
2122
}
2223
| NpmCacheStorageConfig
2324
| AzureBlobCacheStorageConfig
25+
| S3CacheStorageConfig
2426
| CustomStorageConfig;
2527

2628
/**

0 commit comments

Comments
 (0)