Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 16 additions & 24 deletions src/apps/drive/fuse/callbacks/ReadCallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import { TemporalFileByPathFinder } from '../../../../context/storage/TemporalFi
import { TemporalFileChunkReader } from '../../../../context/storage/TemporalFiles/application/read/TemporalFileChunkReader';
import { FirstsFileSearcher } from '../../../../context/virtual-drive/files/application/search/FirstsFileSearcher';
import { StorageFilesRepository } from '../../../../context/storage/StorageFiles/domain/StorageFilesRepository';
import { StorageFileId } from '../../../../context/storage/StorageFiles/domain/StorageFileId';
import { StorageFile } from '../../../../context/storage/StorageFiles/domain/StorageFile';
import { StorageFileDownloader } from '../../../../context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader';
import { DownloadProgressTracker } from '../../../../context/shared/domain/DownloadProgressTracker';
import { type File } from '../../../../context/virtual-drive/files/domain/File';
import {
handleReadCallback,
type HandleReadCallbackDeps,
} from '../../../../backend/features/fuse/on-read/handle-read-callback';
import { buildNetworkClient } from '../../../../infra/environment/download-file/build-network-client';
import { getCredentials } from '../../../main/auth/get-credentials';
import { DependencyInjectionUserProvider } from '../../../shared/dependency-injection/DependencyInjectionUserProvider';

import Fuse from '@gcas/fuse';

Expand All @@ -21,15 +21,17 @@ export class ReadCallback {

async execute(
path: string,
_fd: any,
_fd: unknown,
buf: Buffer,
len: number,
pos: number,
cb: (code: number, params?: any) => void,
cb: (code: number, params?: unknown) => void,
) {
try {
const { mnemonic } = getCredentials();
const user = DependencyInjectionUserProvider.get();
const network = buildNetworkClient({ bridgeUser: user.bridgeUser, userId: user.userId });
const repo = this.container.get(StorageFilesRepository);
const downloader = this.container.get(StorageFileDownloader);
const tracker = this.container.get(DownloadProgressTracker);

const deps: HandleReadCallbackDeps = {
Expand All @@ -39,30 +41,20 @@ export class ReadCallback {
const result = await this.container.get(TemporalFileChunkReader).run(p, length, position);
return result.isPresent() ? result.get() : undefined;
},
existsOnDisk: (contentsId: string) => repo.exists(new StorageFileId(contentsId)),

startDownload: async (virtualFile: File) => {
const storage = StorageFile.from({
id: virtualFile.contentsId,
virtualId: virtualFile.uuid,
size: virtualFile.size,
onDownloadProgress: (name, extension, bytesDownloaded, fileSize, elapsedTime) => {
tracker.downloadUpdate(name, extension, {
percentage: Math.min(bytesDownloaded / fileSize, 1),
elapsedTime,
});
tracker.downloadStarted(virtualFile.name, virtualFile.type);
const { stream, handler } = await downloader.run(storage, virtualFile);
return { stream, elapsedTime: () => handler.elapsedTime() };
},
onDownloadProgress: (name, extension, progress) => {
tracker.downloadUpdate(name, extension, progress);
},
saveToRepository: async (contentsId, size, uuid, name, extension) => {
const storage = StorageFile.from({
id: contentsId,
virtualId: uuid,
size,
});
const storage = StorageFile.from({ id: contentsId, virtualId: uuid, size });
await repo.register(storage);
tracker.downloadFinished(name, extension);
},
bucketId: user.bucket,
mnemonic,
network,
};

const result = await handleReadCallback(deps, path, len, pos);
Expand Down
9 changes: 8 additions & 1 deletion src/apps/drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ export async function startVirtualDrive() {
fuseApp.on('mount-error', () => broadcastToWindows('virtual-drive-status-change', 'ERROR'));

await hydrationApi.start({ debug: false, timeElapsed: false });

/**
* v2.5.4
* Alexis Mora
* If a user abruptly quits the app, all the hydrated files will be orphaned.
* Hence why we clear the cache before starting up the virtual drive.
* To ensure that every time we get a fresh start.
*/
fuseApp.clearCache();
await fuseApp.start();
}

Expand Down
1 change: 1 addition & 0 deletions src/apps/main/network/downloadv2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const downloadOwnFile: DownloadOwnFileFunction = (params) => {

const downloadFileV2: DownloadFileFunction = (params) => {
if (params.token && params.encryptionKey) {
// This is de facto dead code as its never called with params.token
return downloadSharedFile(params);
} else if (params.creds && params.mnemonic) {
return downloadOwnFile(params);
Expand Down
1 change: 1 addition & 0 deletions src/backend/features/fuse/on-read/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const EMPTY = Buffer.alloc(0);
21 changes: 21 additions & 0 deletions src/backend/features/fuse/on-read/download-cache/allocate-file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import fs from 'node:fs/promises';

/**
* Pre-allocates a file on disk to the full expected size before any ranges are downloaded.
*
* This is necessary for random-access writes: since FUSE reads can arrive in any order,
* we need the file to exist at its full size so we can write each range at its correct
* byte offset. Without pre-allocation, writing at offset 500MB would fail because the
* file doesn't exist yet.
*
* The file is filled with zeros initially, the {@link rangeRegistry} tracks which regions
* contain real downloaded bytes vs unfilled zeros.
*/
export async function allocateFile(filePath: string, size: number): Promise<void> {
const handle = await fs.open(filePath, 'w');
try {
await handle.truncate(size);
} finally {
await handle.close();
}
}
7 changes: 7 additions & 0 deletions src/backend/features/fuse/on-read/download-cache/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* 4MB blocks — matches the chunk size used by the legacy downloader, proven to work well
* for this codebase. Each block is downloaded in full on first access regardless of how
* small the FUSE read is, so subsequent reads within the same block are served from disk.
*/
export const BLOCK_SIZE = 4 * 1024 * 1024;
export const BITS_PER_BYTE = 8;
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { HandleReadCallbackDeps } from '../handle-read-callback';
import { writeChunkToDisk } from '../read-chunk-from-disk';
import { type FileHydrationState, markBlocksInRangeDownloaded, startBlockDownload } from './hydration-state';
import { type File } from '../../../../../context/virtual-drive/files/domain/File';
import { downloadFileRange } from '../../../../../infra/environment/download-file/download-file';
import { getStopwatch } from './hydration-stopwatch';

/**
* Downloads a block range, writes it to disk at the correct offset, and marks it as downloaded.
*/
export async function downloadAndCacheBlock(
deps: HandleReadCallbackDeps,
virtualFile: File,
filePath: string,
state: FileHydrationState,
blockStart: number,
blockLength: number,
): Promise<void> {
const resolve = startBlockDownload(state, { position: blockStart, length: blockLength });
try {
const buffer = await downloadFileRange({
fileId: virtualFile.contentsId,
bucketId: deps.bucketId,
mnemonic: deps.mnemonic,
network: deps.network,
range: { position: blockStart, length: blockLength },
signal: new AbortController(),
});
await writeChunkToDisk(filePath, buffer, blockStart);
markBlocksInRangeDownloaded(state, { position: blockStart, length: blockLength });
const elapsedTime = getStopwatch(virtualFile.contentsId)?.elapsedTime() ?? 0;
deps.onDownloadProgress(
virtualFile.name,
virtualFile.type,
blockStart + blockLength,
virtualFile.size,
elapsedTime,
);
} finally {
resolve();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { BLOCK_SIZE } from './constants';

/**
* Given a position and length, rounds up to 4MB block boundaries so that every
* request downloads complete blocks. Ensuring correct bitmap tracking, prefetching,
* and preventing double downloads.
*/
export function expandToBlockBoundaries(
position: number,
length: number,
fileSize: number,
): { blockStart: number; blockLength: number } {
const blockStart = Math.floor(position / BLOCK_SIZE) * BLOCK_SIZE;
const end = position + length;
const blockEnd = Math.min(Math.ceil(end / BLOCK_SIZE) * BLOCK_SIZE, fileSize);
return { blockStart, blockLength: blockEnd - blockStart };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import fs from 'node:fs/promises';
export async function fileExistsOnDisk(filePath: string): Promise<boolean> {
return fs
.stat(filePath)
.then(() => true)
.catch(() => false);
}
169 changes: 169 additions & 0 deletions src/backend/features/fuse/on-read/download-cache/hydration-state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { BITS_PER_BYTE, BLOCK_SIZE } from './constants';

/**
* Tracks which byte ranges of a file have been downloaded and written to disk.
*
* Uses a bitmap where each bit represents one 4MB block of the file.
* A set bit means that block has been FULLY downloaded and written to disk.
* An unset bit means that block contains pre-allocation zeros — not real data.
*
* This is necessary because files are pre-allocated to their full size before any
* data is downloaded, making it impossible to distinguish real bytes from zeros
* by inspecting the file alone.
*
* A block is only marked after its full write to disk succeeds — never partially.
* A hard kill mid-write is handled by wiping the download cache on startup.
*
* Race conditions: concurrent reads for the same block are tracked via an in-flight
* set. The second caller waits for the first to finish rather than double-downloading.
*/

export type FileHydrationState = {
bitmap: Buffer;
totalBlocks: number;
downloadedBlocks: number;
blocksBeingDownloaded: Map<number, Promise<void>>;
};
type Range = {
position: number;
length: number;
};
const hydrationState = new Map<string, FileHydrationState>();

export function getOrInitHydrationState(contentsId: string, fileSize: number): FileHydrationState {
const existing = hydrationState.get(contentsId);
if (existing) return existing;

const totalBlocks = Math.ceil(fileSize / BLOCK_SIZE);
const size = Math.ceil(totalBlocks / BITS_PER_BYTE);
const state: FileHydrationState = {
bitmap: Buffer.alloc(size, 0),
totalBlocks,
downloadedBlocks: 0,
blocksBeingDownloaded: new Map(),
};
hydrationState.set(contentsId, state);
return state;
}

function blockIndexForByte(byte: number): number {
return Math.floor(byte / BLOCK_SIZE);
}

/**
* Creates a bitmask: a number where exactly ONE bit is turned on.
*
* Think of a byte as 8 switches:
* [bit7][bit6][bit5][bit4][bit3][bit2][bit1][bit0]
*
* The mask selects exactly one of those switches.
*
* Examples:
* bitIndexInByte = 0 is 0b00000001 (selects bit 0)
* bitIndexInByte = 2 is 0b00000100 (selects bit 2)
* bitIndexInByte = 7 is 0b10000000 (selects bit 7)
*
* Why we need this:
* - AND (&) with the mask → checks if that bit is set
* - OR (|) with the mask → sets that bit
*
* Implementation:
* Start with 1 (0b00000001) and shift it left N times.
*/
function bitMask(bitIndexInByte: number): number {
return 1 << bitIndexInByte;
}

function getBit(bitmap: Buffer, blockIndex: number): boolean {
const byteIndex = Math.floor(blockIndex / BITS_PER_BYTE);
const bitIndexInByte = blockIndex % BITS_PER_BYTE;
return (bitmap[byteIndex] & bitMask(bitIndexInByte)) !== 0;
}

function setBit(bitmap: Buffer, blockIndex: number): void {
const byteIndex = Math.floor(blockIndex / BITS_PER_BYTE);
const bitIndexInByte = blockIndex % BITS_PER_BYTE;
bitmap[byteIndex] = bitmap[byteIndex] | bitMask(bitIndexInByte);
}

export function isFileHydrated(state: FileHydrationState): boolean {
return state.downloadedBlocks === state.totalBlocks;
}

function blocksWithinRange({ position, length }: Range): Array<number> {
const first = blockIndexForByte(position);
const last = blockIndexForByte(position + length - 1);
const blocks: number[] = [];
for (let block = first; block <= last; block++) {
blocks.push(block);
}
return blocks;
}

export function isRangeHydrated(state: FileHydrationState, { position, length }: Range): boolean {
return blocksWithinRange({ position, length }).every((block) => getBit(state.bitmap, block));
}

export function markBlocksInRangeDownloaded(state: FileHydrationState, { position, length }: Range): void {
for (const block of blocksWithinRange({ position, length })) {
if (!getBit(state.bitmap, block)) {
setBit(state.bitmap, block);
state.downloadedBlocks++;
}
}
}

/**
* Returns block indices within the range that are neither cached nor currently being downloaded.
* Use this after waiting for in-flight blocks to find what still needs downloading.
*/
export function getMissingBlocks(state: FileHydrationState, { position, length }: Range): number[] {
return blocksWithinRange({ position, length }).filter(
(block) => !getBit(state.bitmap, block) && !state.blocksBeingDownloaded.has(block),
);
}

export function getBlocksBeingDownloaded(
state: FileHydrationState,
{ position, length }: Range,
): Map<number, Promise<void>> {
const blocksBeingDownloadedWithinRange = new Map<number, Promise<void>>();
for (const block of blocksWithinRange({ position, length })) {
const existing = state.blocksBeingDownloaded.get(block);
if (existing) blocksBeingDownloadedWithinRange.set(block, existing);
}
return blocksBeingDownloadedWithinRange;
}

/**
* Marks blocks as being downloaded. Call before starting a download.
* Returns a resolve function to call it when the download + write completes.
*/
export function startBlockDownload(state: FileHydrationState, { position, length }: Range): () => void {
let resolve: () => void = () => undefined;
const promise = new Promise<void>((res) => {
resolve = res;
});

for (const block of blocksWithinRange({ position, length })) {
state.blocksBeingDownloaded.set(block, promise);
}

return () => {
for (const block of blocksWithinRange({ position, length })) {
state.blocksBeingDownloaded.delete(block);
}
resolve();
};
}

/**
* Removes the bitmap for a file — call when the file is deleted or cache is cleared.
*/
export function deleteHydrationState(contentsId: string): void {
hydrationState.delete(contentsId);
}

export function clearHydrationState(): void {
hydrationState.clear();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Stopwatch } from '../../../../../apps/shared/types/Stopwatch';

const stopwatches = new Map<string, Stopwatch>();

export function startStopwatch(contentsId: string): void {
const stopWatch = new Stopwatch();
stopWatch.start();
stopwatches.set(contentsId, stopWatch);
}

export function getStopwatch(contentsId: string): Stopwatch | undefined {
return stopwatches.get(contentsId);
}

export function deleteStopwatch(contentsId: string): void {
stopwatches.delete(contentsId);
}
Loading
Loading