diff --git a/src/apps/drive/fuse/callbacks/ReadCallback.ts b/src/apps/drive/fuse/callbacks/ReadCallback.ts index ff131b32ca..cc475bc91c 100644 --- a/src/apps/drive/fuse/callbacks/ReadCallback.ts +++ b/src/apps/drive/fuse/callbacks/ReadCallback.ts @@ -4,15 +4,15 @@ import { TemporalFileByPathFinder } from '../../../../context/storage/TemporalFi import { TemporalFileChunkReader } from '../../../../context/storage/TemporalFiles/application/read/TemporalFileChunkReader'; import { FirstsFileSearcher } from '../../../../context/virtual-drive/files/application/search/FirstsFileSearcher'; import { StorageFilesRepository } from '../../../../context/storage/StorageFiles/domain/StorageFilesRepository'; -import { StorageFileId } from '../../../../context/storage/StorageFiles/domain/StorageFileId'; import { StorageFile } from '../../../../context/storage/StorageFiles/domain/StorageFile'; -import { StorageFileDownloader } from '../../../../context/storage/StorageFiles/application/download/StorageFileDownloader/StorageFileDownloader'; import { DownloadProgressTracker } from '../../../../context/shared/domain/DownloadProgressTracker'; -import { type File } from '../../../../context/virtual-drive/files/domain/File'; import { handleReadCallback, type HandleReadCallbackDeps, } from '../../../../backend/features/fuse/on-read/handle-read-callback'; +import { buildNetworkClient } from '../../../../infra/environment/download-file/build-network-client'; +import { getCredentials } from '../../../main/auth/get-credentials'; +import { DependencyInjectionUserProvider } from '../../../shared/dependency-injection/DependencyInjectionUserProvider'; import Fuse from '@gcas/fuse'; @@ -21,15 +21,17 @@ export class ReadCallback { async execute( path: string, - _fd: any, + _fd: unknown, buf: Buffer, len: number, pos: number, - cb: (code: number, params?: any) => void, + cb: (code: number, params?: unknown) => void, ) { try { + const { mnemonic } = getCredentials(); + const user = DependencyInjectionUserProvider.get(); + const network = buildNetworkClient({ bridgeUser: user.bridgeUser, userId: user.userId }); const repo = this.container.get(StorageFilesRepository); - const downloader = this.container.get(StorageFileDownloader); const tracker = this.container.get(DownloadProgressTracker); const deps: HandleReadCallbackDeps = { @@ -39,30 +41,20 @@ export class ReadCallback { const result = await this.container.get(TemporalFileChunkReader).run(p, length, position); return result.isPresent() ? result.get() : undefined; }, - existsOnDisk: (contentsId: string) => repo.exists(new StorageFileId(contentsId)), - - startDownload: async (virtualFile: File) => { - const storage = StorageFile.from({ - id: virtualFile.contentsId, - virtualId: virtualFile.uuid, - size: virtualFile.size, + onDownloadProgress: (name, extension, bytesDownloaded, fileSize, elapsedTime) => { + tracker.downloadUpdate(name, extension, { + percentage: Math.min(bytesDownloaded / fileSize, 1), + elapsedTime, }); - tracker.downloadStarted(virtualFile.name, virtualFile.type); - const { stream, handler } = await downloader.run(storage, virtualFile); - return { stream, elapsedTime: () => handler.elapsedTime() }; - }, - onDownloadProgress: (name, extension, progress) => { - tracker.downloadUpdate(name, extension, progress); }, saveToRepository: async (contentsId, size, uuid, name, extension) => { - const storage = StorageFile.from({ - id: contentsId, - virtualId: uuid, - size, - }); + const storage = StorageFile.from({ id: contentsId, virtualId: uuid, size }); await repo.register(storage); tracker.downloadFinished(name, extension); }, + bucketId: user.bucket, + mnemonic, + network, }; const result = await handleReadCallback(deps, path, len, pos); diff --git a/src/apps/drive/index.ts b/src/apps/drive/index.ts index c48f372b68..2d951503b1 100644 --- a/src/apps/drive/index.ts +++ b/src/apps/drive/index.ts @@ -28,7 +28,14 @@ export async function startVirtualDrive() { fuseApp.on('mount-error', () => broadcastToWindows('virtual-drive-status-change', 'ERROR')); await hydrationApi.start({ debug: false, timeElapsed: false }); - + /** + * v2.5.4 + * Alexis Mora + * If a user abruptly quits the app, all the hydrated files will be orphaned. + * Hence why we clear the cache before starting up the virtual drive. + * To ensure that every time we get a fresh start. + */ + fuseApp.clearCache(); await fuseApp.start(); } diff --git a/src/apps/main/network/downloadv2.ts b/src/apps/main/network/downloadv2.ts index b928ccdd28..93f58c6b5c 100644 --- a/src/apps/main/network/downloadv2.ts +++ b/src/apps/main/network/downloadv2.ts @@ -95,6 +95,7 @@ const downloadOwnFile: DownloadOwnFileFunction = (params) => { const downloadFileV2: DownloadFileFunction = (params) => { if (params.token && params.encryptionKey) { + // This is de facto dead code as its never called with params.token return downloadSharedFile(params); } else if (params.creds && params.mnemonic) { return downloadOwnFile(params); diff --git a/src/backend/features/fuse/on-read/constants.ts b/src/backend/features/fuse/on-read/constants.ts new file mode 100644 index 0000000000..d0b25712aa --- /dev/null +++ b/src/backend/features/fuse/on-read/constants.ts @@ -0,0 +1 @@ +export const EMPTY = Buffer.alloc(0); diff --git a/src/backend/features/fuse/on-read/download-cache/allocate-file.ts b/src/backend/features/fuse/on-read/download-cache/allocate-file.ts new file mode 100644 index 0000000000..e8a1c1ae00 --- /dev/null +++ b/src/backend/features/fuse/on-read/download-cache/allocate-file.ts @@ -0,0 +1,21 @@ +import fs from 'node:fs/promises'; + +/** + * Pre-allocates a file on disk to the full expected size before any ranges are downloaded. + * + * This is necessary for random-access writes: since FUSE reads can arrive in any order, + * we need the file to exist at its full size so we can write each range at its correct + * byte offset. Without pre-allocation, writing at offset 500MB would fail because the + * file doesn't exist yet. + * + * The file is filled with zeros initially, the {@link rangeRegistry} tracks which regions + * contain real downloaded bytes vs unfilled zeros. + */ +export async function allocateFile(filePath: string, size: number): Promise { + const handle = await fs.open(filePath, 'w'); + try { + await handle.truncate(size); + } finally { + await handle.close(); + } +} diff --git a/src/backend/features/fuse/on-read/download-cache/constants.ts b/src/backend/features/fuse/on-read/download-cache/constants.ts new file mode 100644 index 0000000000..1fc635e970 --- /dev/null +++ b/src/backend/features/fuse/on-read/download-cache/constants.ts @@ -0,0 +1,7 @@ +/** + * 4MB blocks — matches the chunk size used by the legacy downloader, proven to work well + * for this codebase. Each block is downloaded in full on first access regardless of how + * small the FUSE read is, so subsequent reads within the same block are served from disk. + */ +export const BLOCK_SIZE = 4 * 1024 * 1024; +export const BITS_PER_BYTE = 8; diff --git a/src/backend/features/fuse/on-read/download-cache/download-and-save-block.ts b/src/backend/features/fuse/on-read/download-cache/download-and-save-block.ts new file mode 100644 index 0000000000..e799cbd5cb --- /dev/null +++ b/src/backend/features/fuse/on-read/download-cache/download-and-save-block.ts @@ -0,0 +1,42 @@ +import { HandleReadCallbackDeps } from '../handle-read-callback'; +import { writeChunkToDisk } from '../read-chunk-from-disk'; +import { type FileHydrationState, markBlocksInRangeDownloaded, startBlockDownload } from './hydration-state'; +import { type File } from '../../../../../context/virtual-drive/files/domain/File'; +import { downloadFileRange } from '../../../../../infra/environment/download-file/download-file'; +import { getStopwatch } from './hydration-stopwatch'; + +/** + * Downloads a block range, writes it to disk at the correct offset, and marks it as downloaded. + */ +export async function downloadAndCacheBlock( + deps: HandleReadCallbackDeps, + virtualFile: File, + filePath: string, + state: FileHydrationState, + blockStart: number, + blockLength: number, +): Promise { + const resolve = startBlockDownload(state, { position: blockStart, length: blockLength }); + try { + const buffer = await downloadFileRange({ + fileId: virtualFile.contentsId, + bucketId: deps.bucketId, + mnemonic: deps.mnemonic, + network: deps.network, + range: { position: blockStart, length: blockLength }, + signal: new AbortController(), + }); + await writeChunkToDisk(filePath, buffer, blockStart); + markBlocksInRangeDownloaded(state, { position: blockStart, length: blockLength }); + const elapsedTime = getStopwatch(virtualFile.contentsId)?.elapsedTime() ?? 0; + deps.onDownloadProgress( + virtualFile.name, + virtualFile.type, + blockStart + blockLength, + virtualFile.size, + elapsedTime, + ); + } finally { + resolve(); + } +} diff --git a/src/backend/features/fuse/on-read/download-cache/expand-to-block-boundaries.ts b/src/backend/features/fuse/on-read/download-cache/expand-to-block-boundaries.ts new file mode 100644 index 0000000000..bd4cd5c41f --- /dev/null +++ b/src/backend/features/fuse/on-read/download-cache/expand-to-block-boundaries.ts @@ -0,0 +1,17 @@ +import { BLOCK_SIZE } from './constants'; + +/** + * Given a position and length, rounds up to 4MB block boundaries so that every + * request downloads complete blocks. Ensuring correct bitmap tracking, prefetching, + * and preventing double downloads. + */ +export function expandToBlockBoundaries( + position: number, + length: number, + fileSize: number, +): { blockStart: number; blockLength: number } { + const blockStart = Math.floor(position / BLOCK_SIZE) * BLOCK_SIZE; + const end = position + length; + const blockEnd = Math.min(Math.ceil(end / BLOCK_SIZE) * BLOCK_SIZE, fileSize); + return { blockStart, blockLength: blockEnd - blockStart }; +} diff --git a/src/backend/features/fuse/on-read/download-cache/file-exists-on-disk.ts b/src/backend/features/fuse/on-read/download-cache/file-exists-on-disk.ts new file mode 100644 index 0000000000..a8237fbd3a --- /dev/null +++ b/src/backend/features/fuse/on-read/download-cache/file-exists-on-disk.ts @@ -0,0 +1,7 @@ +import fs from 'node:fs/promises'; +export async function fileExistsOnDisk(filePath: string): Promise { + return fs + .stat(filePath) + .then(() => true) + .catch(() => false); +} diff --git a/src/backend/features/fuse/on-read/download-cache/hydration-state.ts b/src/backend/features/fuse/on-read/download-cache/hydration-state.ts new file mode 100644 index 0000000000..a97699537f --- /dev/null +++ b/src/backend/features/fuse/on-read/download-cache/hydration-state.ts @@ -0,0 +1,169 @@ +import { BITS_PER_BYTE, BLOCK_SIZE } from './constants'; + +/** + * Tracks which byte ranges of a file have been downloaded and written to disk. + * + * Uses a bitmap where each bit represents one 4MB block of the file. + * A set bit means that block has been FULLY downloaded and written to disk. + * An unset bit means that block contains pre-allocation zeros — not real data. + * + * This is necessary because files are pre-allocated to their full size before any + * data is downloaded, making it impossible to distinguish real bytes from zeros + * by inspecting the file alone. + * + * A block is only marked after its full write to disk succeeds — never partially. + * A hard kill mid-write is handled by wiping the download cache on startup. + * + * Race conditions: concurrent reads for the same block are tracked via an in-flight + * set. The second caller waits for the first to finish rather than double-downloading. + */ + +export type FileHydrationState = { + bitmap: Buffer; + totalBlocks: number; + downloadedBlocks: number; + blocksBeingDownloaded: Map>; +}; +type Range = { + position: number; + length: number; +}; +const hydrationState = new Map(); + +export function getOrInitHydrationState(contentsId: string, fileSize: number): FileHydrationState { + const existing = hydrationState.get(contentsId); + if (existing) return existing; + + const totalBlocks = Math.ceil(fileSize / BLOCK_SIZE); + const size = Math.ceil(totalBlocks / BITS_PER_BYTE); + const state: FileHydrationState = { + bitmap: Buffer.alloc(size, 0), + totalBlocks, + downloadedBlocks: 0, + blocksBeingDownloaded: new Map(), + }; + hydrationState.set(contentsId, state); + return state; +} + +function blockIndexForByte(byte: number): number { + return Math.floor(byte / BLOCK_SIZE); +} + +/** + * Creates a bitmask: a number where exactly ONE bit is turned on. + * + * Think of a byte as 8 switches: + * [bit7][bit6][bit5][bit4][bit3][bit2][bit1][bit0] + * + * The mask selects exactly one of those switches. + * + * Examples: + * bitIndexInByte = 0 is 0b00000001 (selects bit 0) + * bitIndexInByte = 2 is 0b00000100 (selects bit 2) + * bitIndexInByte = 7 is 0b10000000 (selects bit 7) + * + * Why we need this: + * - AND (&) with the mask → checks if that bit is set + * - OR (|) with the mask → sets that bit + * + * Implementation: + * Start with 1 (0b00000001) and shift it left N times. + */ +function bitMask(bitIndexInByte: number): number { + return 1 << bitIndexInByte; +} + +function getBit(bitmap: Buffer, blockIndex: number): boolean { + const byteIndex = Math.floor(blockIndex / BITS_PER_BYTE); + const bitIndexInByte = blockIndex % BITS_PER_BYTE; + return (bitmap[byteIndex] & bitMask(bitIndexInByte)) !== 0; +} + +function setBit(bitmap: Buffer, blockIndex: number): void { + const byteIndex = Math.floor(blockIndex / BITS_PER_BYTE); + const bitIndexInByte = blockIndex % BITS_PER_BYTE; + bitmap[byteIndex] = bitmap[byteIndex] | bitMask(bitIndexInByte); +} + +export function isFileHydrated(state: FileHydrationState): boolean { + return state.downloadedBlocks === state.totalBlocks; +} + +function blocksWithinRange({ position, length }: Range): Array { + const first = blockIndexForByte(position); + const last = blockIndexForByte(position + length - 1); + const blocks: number[] = []; + for (let block = first; block <= last; block++) { + blocks.push(block); + } + return blocks; +} + +export function isRangeHydrated(state: FileHydrationState, { position, length }: Range): boolean { + return blocksWithinRange({ position, length }).every((block) => getBit(state.bitmap, block)); +} + +export function markBlocksInRangeDownloaded(state: FileHydrationState, { position, length }: Range): void { + for (const block of blocksWithinRange({ position, length })) { + if (!getBit(state.bitmap, block)) { + setBit(state.bitmap, block); + state.downloadedBlocks++; + } + } +} + +/** + * Returns block indices within the range that are neither cached nor currently being downloaded. + * Use this after waiting for in-flight blocks to find what still needs downloading. + */ +export function getMissingBlocks(state: FileHydrationState, { position, length }: Range): number[] { + return blocksWithinRange({ position, length }).filter( + (block) => !getBit(state.bitmap, block) && !state.blocksBeingDownloaded.has(block), + ); +} + +export function getBlocksBeingDownloaded( + state: FileHydrationState, + { position, length }: Range, +): Map> { + const blocksBeingDownloadedWithinRange = new Map>(); + for (const block of blocksWithinRange({ position, length })) { + const existing = state.blocksBeingDownloaded.get(block); + if (existing) blocksBeingDownloadedWithinRange.set(block, existing); + } + return blocksBeingDownloadedWithinRange; +} + +/** + * Marks blocks as being downloaded. Call before starting a download. + * Returns a resolve function to call it when the download + write completes. + */ +export function startBlockDownload(state: FileHydrationState, { position, length }: Range): () => void { + let resolve: () => void = () => undefined; + const promise = new Promise((res) => { + resolve = res; + }); + + for (const block of blocksWithinRange({ position, length })) { + state.blocksBeingDownloaded.set(block, promise); + } + + return () => { + for (const block of blocksWithinRange({ position, length })) { + state.blocksBeingDownloaded.delete(block); + } + resolve(); + }; +} + +/** + * Removes the bitmap for a file — call when the file is deleted or cache is cleared. + */ +export function deleteHydrationState(contentsId: string): void { + hydrationState.delete(contentsId); +} + +export function clearHydrationState(): void { + hydrationState.clear(); +} diff --git a/src/backend/features/fuse/on-read/download-cache/hydration-stopwatch.ts b/src/backend/features/fuse/on-read/download-cache/hydration-stopwatch.ts new file mode 100644 index 0000000000..08516a4952 --- /dev/null +++ b/src/backend/features/fuse/on-read/download-cache/hydration-stopwatch.ts @@ -0,0 +1,17 @@ +import { Stopwatch } from '../../../../../apps/shared/types/Stopwatch'; + +const stopwatches = new Map(); + +export function startStopwatch(contentsId: string): void { + const stopWatch = new Stopwatch(); + stopWatch.start(); + stopwatches.set(contentsId, stopWatch); +} + +export function getStopwatch(contentsId: string): Stopwatch | undefined { + return stopwatches.get(contentsId); +} + +export function deleteStopwatch(contentsId: string): void { + stopwatches.delete(contentsId); +} diff --git a/src/backend/features/fuse/on-read/handle-read-callback.ts b/src/backend/features/fuse/on-read/handle-read-callback.ts index ac69926395..e3c2eab1a4 100644 --- a/src/backend/features/fuse/on-read/handle-read-callback.ts +++ b/src/backend/features/fuse/on-read/handle-read-callback.ts @@ -1,63 +1,117 @@ import { logger } from '@internxt/drive-desktop-core/build/backend'; -import { type Readable } from 'stream'; import { type TemporalFile } from '../../../../context/storage/TemporalFiles/domain/TemporalFile'; import { type File } from '../../../../context/virtual-drive/files/domain/File'; import { left, right, type Either } from '../../../../context/shared/domain/Either'; import { type FuseError, FuseNoSuchFileOrDirectoryError } from '../../../../apps/drive/fuse/callbacks/FuseErrors'; -import { tryCatch } from '../../../../shared/try-catch'; -import { createDownloadToDisk } from './create-download-to-disk'; -import { deleteHydration, getHydration, HydrationEntry, setHydration } from './hydration-registry'; import { readChunkFromDisk } from './read-chunk-from-disk'; import { shouldDownload } from '../on-open/open-flags-tracker'; import nodePath from 'node:path'; import { PATHS } from '../../../../core/electron/paths'; import { formatBytes } from '../../../../shared/format-bytes'; +import { allocateFile } from './download-cache/allocate-file'; +import { expandToBlockBoundaries } from './download-cache/expand-to-block-boundaries'; +import { BLOCK_SIZE } from './download-cache/constants'; +import { + type FileHydrationState, + getOrInitHydrationState, + isRangeHydrated, + isFileHydrated, + getBlocksBeingDownloaded, + getMissingBlocks, +} from './download-cache/hydration-state'; +import { type Network } from '@internxt/sdk'; +import { startStopwatch, deleteStopwatch } from './download-cache/hydration-stopwatch'; +import { fileExistsOnDisk } from './download-cache/file-exists-on-disk'; +import { downloadAndCacheBlock } from './download-cache/download-and-save-block'; +import { EMPTY } from './constants'; export type HandleReadCallbackDeps = { findVirtualFile: (path: string) => Promise; findTemporalFile: (path: string) => Promise; readTemporalFileChunk: (path: string, length: number, position: number) => Promise; - existsOnDisk: (contentsId: string) => Promise; - startDownload: (virtualFile: File) => Promise<{ stream: Readable; elapsedTime: () => number }>; - onDownloadProgress: (name: string, extension: string, progress: { percentage: number; elapsedTime: number }) => void; + onDownloadProgress: ( + name: string, + extension: string, + bytesDownloaded: number, + fileSize: number, + elapsedTime: number, + ) => void; saveToRepository: (contentsId: string, size: number, uuid: string, name: string, extension: string) => Promise; + bucketId: string; + mnemonic: string; + network: Network.Network; }; -const EMPTY = Buffer.alloc(0); +async function readFromTemporalFile( + deps: HandleReadCallbackDeps, + path: string, + length: number, + position: number, +): Promise> { + const temporalFile = await deps.findTemporalFile(path); + + if (!temporalFile) { + logger.error({ msg: '[ReadCallback] File not found', path }); + return left(new FuseNoSuchFileOrDirectoryError(path)); + } + + const chunk = await deps.readTemporalFileChunk(temporalFile.path.value, length, position); + return right(chunk ?? EMPTY); +} + +async function ensureFileAllocated(filePath: string, virtualFile: File): Promise { + const allocated = await fileExistsOnDisk(filePath); + if (!allocated) { + await allocateFile(filePath, virtualFile.size); + startStopwatch(virtualFile.contentsId); + } + return getOrInitHydrationState(virtualFile.contentsId, virtualFile.size); +} -async function startHydration( +async function ensureRangeDownloaded( deps: HandleReadCallbackDeps, virtualFile: File, filePath: string, -): Promise { - const { stream, elapsedTime } = await deps.startDownload(virtualFile); - const writer = createDownloadToDisk(stream, filePath, { - onProgress: (bytesWritten) => { - deps.onDownloadProgress(virtualFile.name, virtualFile.type, { - percentage: Math.min(bytesWritten / virtualFile.size, 1), - elapsedTime: elapsedTime(), - }); - }, - onFinished: () => { - deleteHydration(virtualFile.contentsId); - deps.saveToRepository( - virtualFile.contentsId, - virtualFile.size, - virtualFile.uuid, - virtualFile.name, - virtualFile.type, - ); - }, - onError: (err) => { - logger.error({ msg: '[startHydration] onError', error: err }); - tryCatch(() => writer.destroy()); - deleteHydration(virtualFile.contentsId); - }, - }); + state: FileHydrationState, + position: number, + length: number, +): Promise { + const { blockStart, blockLength } = expandToBlockBoundaries(position, length, virtualFile.size); + + const blocksBeingDownloaded = getBlocksBeingDownloaded(state, { position: blockStart, length: blockLength }); + if (blocksBeingDownloaded.size > 0) { + logger.debug({ msg: '[ReadCallback] waiting for blocks being downloaded', file: virtualFile.nameWithExtension }); + await Promise.all(blocksBeingDownloaded.values()); + } + + const missingBlocks = getMissingBlocks(state, { position: blockStart, length: blockLength }); + if (missingBlocks.length > 0) { + logger.debug({ + msg: '[ReadCallback] downloading missing blocks', + file: virtualFile.nameWithExtension, + blocks: missingBlocks, + }); + await Promise.all( + missingBlocks.map((block) => { + const start = block * BLOCK_SIZE; + const end = Math.min(start + BLOCK_SIZE, virtualFile.size); + return downloadAndCacheBlock(deps, virtualFile, filePath, state, start, end - start); + }), + ); + } +} - setHydration(virtualFile.contentsId, { writer }); - return { writer }; +async function onFileFullyHydrated(deps: HandleReadCallbackDeps, virtualFile: File): Promise { + deleteStopwatch(virtualFile.contentsId); + await deps.saveToRepository( + virtualFile.contentsId, + virtualFile.size, + virtualFile.uuid, + virtualFile.name, + virtualFile.type, + ); } + export async function handleReadCallback( deps: HandleReadCallbackDeps, path: string, @@ -67,15 +121,7 @@ export async function handleReadCallback( const virtualFile = await deps.findVirtualFile(path); if (!virtualFile) { - const temporalFile = await deps.findTemporalFile(path); - - if (!temporalFile) { - logger.error({ msg: '[ReadCallback] File not found', path }); - return left(new FuseNoSuchFileOrDirectoryError(path)); - } - - const chunk = await deps.readTemporalFileChunk(temporalFile.path.value, length, position); - return right(chunk ?? EMPTY); + return readFromTemporalFile(deps, path, length, position); } if (!shouldDownload(path)) { @@ -83,46 +129,26 @@ export async function handleReadCallback( return right(EMPTY); } - const filePath = nodePath.join(PATHS.DOWNLOADED, virtualFile.contentsId); - logger.debug({ msg: '[ReadCallback] read request:', file: virtualFile.nameWithExtension, - position, - length, - targetByte: position + length, + position: formatBytes(position), + length: formatBytes(length), }); - if (await deps.existsOnDisk(virtualFile.contentsId)) { - const chunk = await readChunkFromDisk(filePath, length, position); - return right(chunk); - } - - const hydration = getHydration(virtualFile.contentsId) ?? (await startHydration(deps, virtualFile, filePath)); - const targetByte = position + length; - const bytesAvailable = hydration.writer.getBytesAvailable(); - const waitStart = Date.now(); + const filePath = nodePath.join(PATHS.DOWNLOADED, virtualFile.contentsId); + const state = await ensureFileAllocated(filePath, virtualFile); - if (bytesAvailable < targetByte) { - logger.debug({ - msg: '[ReadCallback] waiting for download to catch up', - file: virtualFile.nameWithExtension, - position: formatBytes(position), - targetByte: formatBytes(targetByte), - bytesAvailable: formatBytes(bytesAvailable), - bytesAhead: formatBytes(targetByte - bytesAvailable), - }); + if (isRangeHydrated(state, { position, length })) { + logger.debug({ msg: '[ReadCallback] serving from disk cache', file: virtualFile.nameWithExtension }); + return right(await readChunkFromDisk(filePath, length, position)); } - await hydration.writer.waitForBytes(position, length); + await ensureRangeDownloaded(deps, virtualFile, filePath, state, position, length); - logger.debug({ - msg: '[ReadCallback] wait resolved', - file: virtualFile.nameWithExtension, - position: formatBytes(position), - waitedMs: Date.now() - waitStart, - }); + if (isFileHydrated(state)) { + await onFileFullyHydrated(deps, virtualFile); + } - const chunk = await readChunkFromDisk(filePath, length, position); - return right(chunk); + return right(await readChunkFromDisk(filePath, length, position)); } diff --git a/src/backend/features/fuse/on-read/read-chunk-from-disk.ts b/src/backend/features/fuse/on-read/read-chunk-from-disk.ts index c8e41fd3b6..7a35678d7d 100644 --- a/src/backend/features/fuse/on-read/read-chunk-from-disk.ts +++ b/src/backend/features/fuse/on-read/read-chunk-from-disk.ts @@ -1,5 +1,14 @@ import fs from 'node:fs/promises'; +export async function writeChunkToDisk(filePath: string, buffer: Buffer, position: number): Promise { + const handle = await fs.open(filePath, 'r+'); + try { + await handle.write(new Uint8Array(buffer), 0, buffer.length, position); + } finally { + await handle.close(); + } +} + export async function readChunkFromDisk(filePath: string, length: number, position: number): Promise { const handle = await fs.open(filePath, 'r'); diff --git a/src/infra/environment/download-file/build-crypto-lib.ts b/src/infra/environment/download-file/build-crypto-lib.ts new file mode 100644 index 0000000000..2c7c262134 --- /dev/null +++ b/src/infra/environment/download-file/build-crypto-lib.ts @@ -0,0 +1,14 @@ +import { Network } from '@internxt/sdk'; +import { validateMnemonic } from 'bip39'; +import { Environment } from '@internxt/inxt-js'; +import { randomBytes } from 'node:crypto'; + +export function buildCryptoLib(): Network.Crypto { + return { + algorithm: Network.ALGORITHMS.AES256CTR, + validateMnemonic: (mnemonic: string) => validateMnemonic(mnemonic), + generateFileKey: (mnemonic, bucketId, index) => + Environment.utils.generateFileKey(mnemonic, bucketId, index as Buffer), + randomBytes, + }; +} diff --git a/src/infra/environment/download-file/build-network-client.ts b/src/infra/environment/download-file/build-network-client.ts new file mode 100644 index 0000000000..43414b2308 --- /dev/null +++ b/src/infra/environment/download-file/build-network-client.ts @@ -0,0 +1,23 @@ +import { Network } from '@internxt/sdk'; +import { createHash } from 'node:crypto'; +import { INTERNXT_CLIENT, INTERNXT_VERSION } from '../../../core/utils/utils'; + +export type NetworkClientCredentials = { + bridgeUser: string; + userId: string; +}; + +export function buildNetworkClient(credentials: NetworkClientCredentials): Network.Network { + return Network.Network.client( + process.env.BRIDGE_URL as string, + { + clientName: INTERNXT_CLIENT, + clientVersion: INTERNXT_VERSION, + desktopHeader: process.env.INTERNXT_DESKTOP_HEADER_KEY, + }, + { + bridgeUser: credentials.bridgeUser, + userId: createHash('sha256').update(credentials.userId).digest('hex'), + }, + ); +} diff --git a/src/infra/environment/download-file/decrypt-at-offset.ts b/src/infra/environment/download-file/decrypt-at-offset.ts new file mode 100644 index 0000000000..22028ea459 --- /dev/null +++ b/src/infra/environment/download-file/decrypt-at-offset.ts @@ -0,0 +1,38 @@ +import { createDecipheriv } from 'node:crypto'; + +/** + * Decrypts a byte range of an AES-256-CTR encrypted file starting at a given position. + * + * AES-CTR is a stream cipher that works by encrypting sequential counter blocks and XORing + * the result with the plaintext. This makes it seekable: to decrypt bytes starting at position N, + * you only need to know which counter block N falls in, rather than decrypting all preceding bytes. + * + * The counter block for position N is: originalIV + floor(N / 16) + * If N is mid-block (N % 16 !== 0), we advance the decipher by the partial block remainder + * before decrypting the actual bytes. + * + * @param encryptedBytes - The raw encrypted bytes for the requested range (fetched via HTTP Range header) + * @param key - The AES-256 file key + * @param iv - Initialization Vector: a random 16-byte value generated when the file was encrypted, + * stored in the file's network metadata index. Ensures that two files with the same key + * produce different ciphertext. Retrieved by the SDK as the first 16 bytes of the file index. + * @param position - The byte offset in the full file where this range starts + */ +export function decryptAtOffset(encryptedBytes: Buffer, key: Buffer, iv: Buffer, position: number): Buffer { + const AES_BLOCK_SIZE = 16; + const partialBlock = position % AES_BLOCK_SIZE; + const startBlockNumber = (position - partialBlock) / AES_BLOCK_SIZE; + + // Compute the IV for the starting block by adding the block number to the original IV + const ivForRange = (BigInt('0x' + iv.toString('hex')) + BigInt(startBlockNumber)).toString(16).padStart(32, '0'); + const offsetIv = Buffer.from(ivForRange, 'hex'); + + const decipher = createDecipheriv('aes-256-ctr', new Uint8Array(key), new Uint8Array(offsetIv)); + + // If position is mid-block, skip the leading partial block bytes + if (partialBlock > 0) { + decipher.update(new Uint8Array(partialBlock)); + } + + return decipher.update(new Uint8Array(encryptedBytes)); +} diff --git a/src/infra/environment/download-file/download-file.ts b/src/infra/environment/download-file/download-file.ts new file mode 100644 index 0000000000..65bed096e4 --- /dev/null +++ b/src/infra/environment/download-file/download-file.ts @@ -0,0 +1,72 @@ +import { DecryptFileFunction, DownloadFileFunction } from '@internxt/sdk/dist/network'; +import { downloadFile as sdkDownloadFile } from '@internxt/sdk/dist/network/download'; +import axios from 'axios'; +import { buildCryptoLib } from './build-crypto-lib'; +import { DownloadFileProps } from './types'; +import { DriveDesktopError } from '../../../context/shared/domain/errors/DriveDesktopError'; +import { decryptAtOffset } from './decrypt-at-offset'; + +async function fetchEncryptedRange(url: string, position: number, length: number): Promise { + const response = await axios.get(url, { + responseType: 'stream', + headers: { + range: `bytes=${position}-${position + length - 1}`, + }, + }); + + return new Promise((resolve, reject) => { + const chunks: Uint8Array[] = []; + response.data.on('data', (chunk: Uint8Array) => chunks.push(chunk)); + response.data.on('end', () => resolve(Buffer.concat(chunks))); + response.data.on('error', reject); + }); +} + +export async function downloadFileRange({ + signal, + fileId, + bucketId, + mnemonic, + network, + range, +}: DownloadFileProps): Promise { + let encryptedBytes: Buffer | undefined; + let decryptedBuffer: Buffer | undefined; + + const downloadFileCb: DownloadFileFunction = async (downloadables) => { + if (range && downloadables.length > 1) { + throw new Error('Multi-Part Download with Range-Requests is not implemented'); + } + for (const downloadable of downloadables) { + if (signal.signal.aborted) { + throw new DriveDesktopError('ABORTED'); + } + // eslint-disable-next-line no-await-in-loop + encryptedBytes = await fetchEncryptedRange(downloadable.url, range.position, range.length); + } + }; + + const decryptFileCb: DecryptFileFunction = async (_, key, iv) => { + if (!encryptedBytes) throw new Error('No encrypted bytes to decrypt'); + decryptedBuffer = decryptAtOffset( + encryptedBytes, + Buffer.from(key.toString('hex'), 'hex'), + Buffer.from(iv.toString('hex'), 'hex'), + range.position, + ); + }; + + await sdkDownloadFile( + fileId, + bucketId, + mnemonic, + network, + buildCryptoLib(), + Buffer.from, + downloadFileCb, + decryptFileCb, + ); + + if (!decryptedBuffer) throw new Error('Decryption did not produce a buffer'); + return decryptedBuffer; +} diff --git a/src/infra/environment/download-file/types.ts b/src/infra/environment/download-file/types.ts new file mode 100644 index 0000000000..5c6c681983 --- /dev/null +++ b/src/infra/environment/download-file/types.ts @@ -0,0 +1,13 @@ +import { Network } from '@internxt/sdk'; + +export type DownloadFileProps = { + signal: AbortController; + fileId: string; + bucketId: string; + mnemonic: string; + network: Network.Network; + range: { + position: number; + length: number; + }; +};