Skip to content

Commit 00ff206

Browse files
committed
Fixed issue where two workers memory could go out-of-sync
1 parent 4e1cb79 commit 00ff206

4 files changed

Lines changed: 128 additions & 49 deletions

File tree

src/TransferableDataStructure.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ export default abstract class TransferableDataStructure {
77
private decoderBuffer: ArrayBuffer = new ArrayBuffer(TransferableDataStructure.DECODER_BUFFER_SIZE);
88
private currentDecoderBufferSize: number = TransferableDataStructure.DECODER_BUFFER_SIZE;
99

10-
protected allocateMemory(byteSize: number): SharedArrayBuffer | ArrayBuffer {
10+
protected allocateMemory(byteSize: number, maxByteLength?: number): SharedArrayBuffer | ArrayBuffer {
1111
try {
12-
return new SharedArrayBuffer(byteSize);
12+
return new SharedArrayBuffer(
13+
byteSize,
14+
maxByteLength !== undefined ? { maxByteLength } : undefined
15+
);
1316
} catch (err) {
1417
throw new Error(`Could not allocate memory. Tried to allocate ${byteSize} bytes.`);
1518
}

src/map/ShareableMap.ts

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
4747
*/
4848
private static readonly WRITE_LOCK_VALUE = -1;
4949

50+
// Default ceilings for the growable SharedArrayBuffers.
51+
// Virtual address space is reserved up-front but physical pages are committed lazily,
52+
// so these values are cheap to declare. Users can override maxDataBytes via ShareableMapOptions.
53+
private static readonly DEFAULT_MAX_DATA_BYTES = 256 * 1024 * 1024; // 256 MiB
54+
private static readonly DEFAULT_MAX_INDEX_BYTES = 64 * 1024 * 1024; // 64 MiB
55+
5056

5157
private indexMem!: SharedArrayBuffer | ArrayBuffer;
5258
private dataMem!: SharedArrayBuffer | ArrayBuffer;
@@ -582,73 +588,81 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
582588
}
583589

584590
/**
585-
* Allocate a new ArrayBuffer that's twice the size of the previous buffer and copy all contents from the old to the
586-
* new buffer. This method should be called when not enough free space is available for elements to be stored.
591+
* Grow the data buffer to accommodate more entries. Uses SharedArrayBuffer.grow() so all workers that
592+
* share the same buffer see the larger size automatically — no buffer re-sharing is required.
593+
* The write lock must be held by the caller.
587594
*/
588-
private doubleDataStorage() {
589-
let newDataMem: SharedArrayBuffer | ArrayBuffer;
590-
if (this.dataMem.byteLength > 512 * 1024 * 1024) {
591-
// Increase linearly (instead of doubling) with the size of the data array if this is larger than 512MB.
592-
newDataMem = this.allocateMemory(this.dataView.byteLength + 256 * 1024 * 1024);
593-
} else {
594-
newDataMem = this.allocateMemory(this.dataMem.byteLength * 2);
595-
}
596-
597-
// Copy the data from the old to the new buffer
598-
const newDataArray = new Uint8Array(newDataMem);
599-
newDataArray.set(new Uint8Array(this.dataMem));
600-
this.dataMem = newDataMem;
595+
private doubleDataStorage(): void {
596+
// Increase linearly (instead of doubling) if the size of the data array is more than 512MiB.
597+
const newSize = this.dataMem.byteLength > 512 * 1024 * 1024
598+
? this.dataMem.byteLength + 256 * 1024 * 1024
599+
: this.dataMem.byteLength * 2;
600+
601+
// Grow in-place. Existing data is preserved; new region is zero-initialised.
602+
// If newSize exceeds maxByteLength a RangeError is thrown — callers should
603+
// increase maxBytes in ShareableMapOptions to avoid this.
604+
(this.dataMem as SharedArrayBuffer).grow(newSize);
605+
606+
// Refresh this worker's DataView so its byteLength reflects the new size.
607+
// Other workers' auto-length DataViews over the same SAB update automatically.
601608
this.dataView = new DataView(this.dataMem);
602609
}
603610

604611
/**
605612
* Call this function if the effective load factor of the map is higher than the allowed load factor (default 0.75).
606-
* This method will double the amount of available buckets and make sure all pointers are placed in the correct
607-
* location.
613+
* This method will double the amount of available buckets and rehash all entries.
614+
*
615+
* Strategy: rehash into a private ArrayBuffer (invisible to other workers), then grow the shared index SAB
616+
* in-place and overwrite it with the rehashed layout. The lock word stays at the same address throughout,
617+
* so workers parked in Atomics.wait() are woken correctly when the write lock is released.
618+
* The write lock must be held by the caller.
608619
*/
609-
private doubleIndexStorage() {
610-
const oldBuckets = this.buckets;
611-
const newIndex = this.allocateMemory(ShareableMap.INT_SIZE * oldBuckets * 2);
612-
const newIndexView = new DataView(newIndex);
613-
const newBuckets = (newIndexView.byteLength - ShareableMap.INDEX_TABLE_OFFSET) / ShareableMap.INT_SIZE;
620+
private doubleIndexStorage(): void {
621+
const oldBuckets = this.buckets;
622+
const newTotalSize = ShareableMap.INDEX_TABLE_OFFSET + oldBuckets * 2 * ShareableMap.INT_SIZE;
623+
const newBuckets = oldBuckets * 2;
614624

615-
let bucketsInUse: number = 0;
625+
// Phase 1: rehash into a private temp buffer (not shared, no synchronisation needed).
626+
const tempBuf = new ArrayBuffer(newTotalSize);
627+
const tempView = new DataView(tempBuf);
628+
let bucketsInUse = 0;
616629

617-
// Now, we need to rehash all previous values and recompute the bucket pointers
618630
for (let bucket = 0; bucket < oldBuckets; bucket++) {
619-
let startPos = this.indexView.getUint32(ShareableMap.INDEX_TABLE_OFFSET + bucket * 4);
620-
631+
let startPos = this.indexView.getUint32(
632+
ShareableMap.INDEX_TABLE_OFFSET + bucket * ShareableMap.INT_SIZE
633+
);
621634
while (startPos !== 0) {
622-
// Rehash
623-
const hash: number = this.readHashFromDataObject(startPos);
635+
const hash = this.readHashFromDataObject(startPos);
624636
const newBucket = hash % newBuckets;
625-
626-
const newBucketContent = newIndexView.getUint32(ShareableMap.INDEX_TABLE_OFFSET + newBucket * 4);
627-
// Should we directly update the bucket content or follow the links and update those?
628-
if (newBucketContent === 0) {
637+
const existing = tempView.getUint32(
638+
ShareableMap.INDEX_TABLE_OFFSET + newBucket * ShareableMap.INT_SIZE
639+
);
640+
if (existing === 0) {
629641
bucketsInUse++;
630-
newIndexView.setUint32(ShareableMap.INDEX_TABLE_OFFSET + newBucket * 4, startPos);
642+
tempView.setUint32(
643+
ShareableMap.INDEX_TABLE_OFFSET + newBucket * ShareableMap.INT_SIZE,
644+
startPos
645+
);
631646
} else {
632-
// The bucket already exists, add the new object to the end of the chain.
633-
this.updateLinkedPointer(newBucketContent, startPos, this.dataView);
647+
this.updateLinkedPointer(existing, startPos, this.dataView);
634648
}
635-
636-
// Follow link in the chain and update its properties.
637-
const newStartPos = this.dataView.getUint32(startPos);
649+
const next = this.dataView.getUint32(startPos);
638650
this.dataView.setUint32(startPos, 0);
639-
startPos = newStartPos;
651+
startPos = next;
640652
}
641653
}
642654

643-
// Copy metadata between the old and new buffer
644-
for (let i = 0; i < ShareableMap.INDEX_TABLE_OFFSET; i += 4) {
645-
newIndexView.setUint32(i, this.indexView.getUint32(i));
655+
// Copy metadata prefix (size, usedBuckets, freeStart, lock word, usedSpace) from old index.
656+
for (let i = 0; i < ShareableMap.INDEX_TABLE_OFFSET; i += ShareableMap.INT_SIZE) {
657+
tempView.setUint32(i, this.indexView.getUint32(i));
646658
}
659+
tempView.setUint32(ShareableMap.INDEX_USED_BUCKETS_OFFSET, bucketsInUse);
647660

648-
this.indexMem = newIndex;
661+
// Phase 2: grow the shared index SAB in-place, then overwrite it with the rehashed layout.
662+
// The lock word at INDEX_LOCK_OFFSET remains in the same SAB — waiting workers are unaffected.
663+
(this.indexMem as SharedArrayBuffer).grow(newTotalSize);
649664
this.indexView = new DataView(this.indexMem);
650-
// The buckets that are currently in use is the only thing that did change for the new index table.
651-
this.indexView.setUint32(4, bucketsInUse);
665+
new Uint8Array(this.indexMem as SharedArrayBuffer).set(new Uint8Array(tempBuf));
652666
}
653667

654668
private getEncoder(value: V): [Serializable<any>, number] {
@@ -799,7 +813,10 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
799813
const buckets = Math.ceil(expectedSize / ShareableMap.LOAD_FACTOR)
800814
const indexSize = 5 * 4 + buckets * ShareableMap.INT_SIZE;
801815

802-
this.indexMem = this.allocateMemory(indexSize);
816+
const maxDataBytes = this.originalOptions.maxBytes ?? ShareableMap.DEFAULT_MAX_DATA_BYTES;
817+
const maxIndexBytes = ShareableMap.DEFAULT_MAX_INDEX_BYTES;
818+
819+
this.indexMem = this.allocateMemory(indexSize, maxIndexBytes);
803820
this.indexView = new DataView(this.indexMem);
804821

805822
// Free space starts from position 1 in the data array (instead of 0, which we use to indicate the end).
@@ -808,7 +825,7 @@ export class ShareableMap<K, V> extends TransferableDataStructure {
808825
// Size must be a multiple of 4
809826
const dataSize = averageBytesPerValue * expectedSize;
810827

811-
this.dataMem = this.allocateMemory(dataSize);
828+
this.dataMem = this.allocateMemory(dataSize, maxDataBytes);
812829
this.dataView = new DataView(this.dataMem);
813830
}
814831

src/map/ShareableMapOptions.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,12 @@ export default interface ShareableMapOptions<V> {
2323
* performance of the ShareableMap.
2424
*/
2525
serializer?: Serializable<V>;
26+
27+
/**
28+
* Maximum number of bytes the data buffer may grow to. Virtual address space is reserved at construction time,
29+
* but physical pages are committed lazily by the OS, so a large value is inexpensive to declare.
30+
*
31+
* Defaults to 256 MiB. This is a hard ceiling — inserting data beyond this limit will throw a RangeError.
32+
*/
33+
maxBytes?: number;
2634
};

src/map/__tests__/ShareableMap.spec.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,57 @@ describe("ShareableMap", () => {
205205
}
206206
});
207207

208+
it("should preserve all entries after doubleDataStorage and doubleIndexStorage are triggered", () => {
209+
// Small initial size (4 entries × 64 bytes = 256 byte data buffer, 4-bucket index) so both
210+
// growth paths are exercised within the first few hundred insertions.
211+
const map = new ShareableMap<string, string>({
212+
expectedSize: 4,
213+
averageBytesPerValue: 64,
214+
maxBytes: 256 * 1024 * 1024
215+
});
216+
217+
const inserted = new Map<string, string>();
218+
for (let i = 0; i < 500; i++) {
219+
const key = `key-${i}`;
220+
const value = `value-${i}`;
221+
map.set(key, value);
222+
inserted.set(key, value);
223+
}
224+
225+
expect(map.size).toBe(500);
226+
for (const [key, value] of inserted) {
227+
expect(map.get(key)).toBe(value);
228+
}
229+
});
230+
231+
it("should make grown data visible to a second instance sharing the same buffers", () => {
232+
// Simulate two workers sharing the same SharedArrayBuffers via toTransferableState /
233+
// fromTransferableState. After instance A forces a buffer grow, instance B must see
234+
// all entries without any additional buffer re-sharing.
235+
const mapA = new ShareableMap<string, string>({
236+
expectedSize: 4,
237+
averageBytesPerValue: 64,
238+
maxBytes: 256 * 1024 * 1024
239+
});
240+
241+
// Instance B shares the exact same SABs as A from the start.
242+
const mapB = ShareableMap.fromTransferableState<string, string>(mapA.toTransferableState());
243+
244+
const inserted = new Map<string, string>();
245+
for (let i = 0; i < 500; i++) {
246+
const key = `key-${i}`;
247+
const value = `value-${i}`;
248+
mapA.set(key, value);
249+
inserted.set(key, value);
250+
}
251+
252+
// B must see every entry written by A — no re-sharing of buffers performed.
253+
expect(mapB.size).toBe(500);
254+
for (const [key, value] of inserted) {
255+
expect(mapB.get(key)).toBe(value);
256+
}
257+
});
258+
208259
it("should correctly defragment the map if required", () => {
209260
const map = new ShareableMap<string, string>();
210261

0 commit comments

Comments
 (0)