Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,29 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [4.0.0] — Conduit (2026-02-27)

### Breaking Changes
- **`CasService` no longer extends `EventEmitter`** — event subscriptions must use the new `ObservabilityPort` adapters instead of `service.on()`. The `EventEmitterObserver` adapter provides full backward compatibility for existing event-based code.
- **`observability` is a required constructor port** for `CasService`. The facade (`ContentAddressableStore`) defaults to `SilentObserver` when omitted.

### Added
- **ObservabilityPort** — new hexagonal port (`src/ports/ObservabilityPort.js`) with `metric(channel, data)`, `log(level, msg, meta?)`, and `span(name)` methods. Decouples the domain layer from Node's event infrastructure.
- **SilentObserver** — no-op adapter (default). Zero overhead when observability is not needed.
- **EventEmitterObserver** — bridges `metric()` calls to EventEmitter events (`chunk:stored`, `file:restored`, etc.) for backward-compatible progress tracking. Exposes `.on()`, `.removeListener()`, `.listenerCount()`.
- **StatsCollector** — accumulates metrics and exposes `summary()` with `chunksProcessed`, `bytesTotal`, `elapsed`, `throughput`, and `errors`.
- **`restoreStream()`** — new async generator on `CasService` and facade. Returns `AsyncIterable<Buffer>` for streaming restore with O(chunkSize) memory for unencrypted, uncompressed files. Encrypted/compressed files buffer internally but expose the same streaming API.
- **`restoreFile()` now uses streaming I/O** — writes via `createWriteStream` + `pipeline` instead of buffering the entire file with `writeFileSync`.
- **Parallel chunk I/O** — new `concurrency` option (default: 1). Store operations launch chunk writes through a counting semaphore. Streaming restore uses read-ahead for concurrent blob fetches. `concurrency: 1` produces identical sequential behavior.
- **Semaphore** — internal counting semaphore (`src/domain/services/Semaphore.js`) for concurrency control.
- 43 new unit tests (567 total).

### Changed
- CLI `store` and `restore` commands now create an `EventEmitterObserver` and pass it to the CAS instance, attaching progress tracking to the observer instead of the service.
- `restore()` reimplemented as a collector over `restoreStream()`.
- `_chunkAndStore()` refactored to use semaphore-gated parallel writes with `Promise.all`, sorting results by index after completion.
- Progress tracking example (`examples/progress-tracking.js`) updated to use `EventEmitterObserver` pattern.

## [3.1.0] — Bijou (2026-02-27)

### Added
Expand Down
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@ We use the object database.

See [CHANGELOG.md](./CHANGELOG.md) for the full list of changes.

## What's new in v3.1.0
## What's new in v4.0.0

**Interactive vault dashboard** — `git cas vault dashboard` launches a full TUI with split-pane layout, manifest detail view, keyboard navigation, and real-time filtering. Browse entries, inspect chunks, and view encryption status without memorizing CLI flags.
**ObservabilityPort** — `CasService` no longer extends `EventEmitter`. A new hexagonal `ObservabilityPort` decouples the domain from Node's event infrastructure. Three adapters ship out of the box: `SilentObserver` (no-op default), `EventEmitterObserver` (backward-compatible event bridge), and `StatsCollector` (metric accumulator).

**Progress bars** — `store` and `restore` now show animated progress with throughput reporting. Suppress with `--quiet`.
**Streaming restore** — `restoreStream()` returns an `AsyncIterable<Buffer>` with O(chunkSize) memory for unencrypted files. `restoreFile()` now writes via `createWriteStream` + `pipeline` instead of buffering.

**Pretty history** — `git cas vault history --pretty` renders a color-coded, paginated timeline of vault commits.

**Inspect command** — `git cas inspect <tree-oid>` renders manifest details with chunk table, encryption info, and compression badges.
**Parallel chunk I/O** — new `concurrency` option gates store writes and restore reads through a counting semaphore. `concurrency: 4` can significantly speed up large-file operations.

See [CHANGELOG.md](./CHANGELOG.md) for the full list of changes.

Expand Down
789 changes: 785 additions & 4 deletions ROADMAP.md

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions bin/git-cas.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { readFileSync } from 'node:fs';
import { program } from 'commander';
import GitPlumbing, { ShellRunnerFactory } from '@git-stunts/plumbing';
import ContentAddressableStore from '../index.js';
import ContentAddressableStore, { EventEmitterObserver } from '../index.js';
import Manifest from '../src/domain/value-objects/Manifest.js';
import { createStoreProgress, createRestoreProgress } from './ui/progress.js';
import { renderEncryptionCard } from './ui/encryption-card.js';
Expand All @@ -14,7 +14,7 @@ import { renderHeatmap } from './ui/heatmap.js';
program
.name('git-cas')
.description('Content Addressable Storage backed by Git')
.version('3.1.0')
.version('4.0.0')
.option('-q, --quiet', 'Suppress progress output');

/**
Expand All @@ -25,12 +25,12 @@ function readKeyFile(keyFilePath) {
}

/**
* Create a CAS instance for the given working directory.
* Create a CAS instance for the given working directory with an optional observability adapter.
*/
function createCas(cwd) {
function createCas(cwd, { observability } = {}) {
const runner = ShellRunnerFactory.create();
const plumbing = new GitPlumbing({ runner, cwd });
return new ContentAddressableStore({ plumbing });
return new ContentAddressableStore({ plumbing, observability });
}

/**
Expand Down Expand Up @@ -104,18 +104,18 @@ program
.option('--cwd <dir>', 'Git working directory', '.')
.action(async (file, opts) => {
try {
const cas = createCas(opts.cwd);
const observer = new EventEmitterObserver();
const cas = createCas(opts.cwd, { observability: observer });
const encryptionKey = await resolveEncryptionKey(cas, opts);
const storeOpts = { filePath: file, slug: opts.slug };
if (encryptionKey) {
storeOpts.encryptionKey = encryptionKey;
}

const service = await cas.getService();
const progress = createStoreProgress({
filePath: file, chunkSize: cas.chunkSize, quiet: program.opts().quiet,
});
progress.attach(service);
progress.attach(observer);
let manifest;
try {
manifest = await cas.storeFile(storeOpts);
Expand Down Expand Up @@ -202,7 +202,8 @@ program
.action(async (opts) => {
try {
validateRestoreFlags(opts);
const cas = createCas(opts.cwd);
const observer = new EventEmitterObserver();
const cas = createCas(opts.cwd, { observability: observer });
const treeOid = opts.oid || await cas.resolveVaultEntry({ slug: opts.slug });
const manifest = await cas.readManifest({ treeOid });

Expand All @@ -212,11 +213,10 @@ program
restoreOpts.encryptionKey = encryptionKey;
}

const service = await cas.getService();
const progress = createRestoreProgress({
totalChunks: manifest.chunks.length, quiet: program.opts().quiet,
});
progress.attach(service);
progress.attach(observer);
let bytesWritten;
try {
({ bytesWritten } = await cas.restoreFile({
Expand Down
4 changes: 2 additions & 2 deletions bin/ui/progress.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function formatBytes(bytes) {
* @param {string} options.filePath - Path to the file being stored.
* @param {number} options.chunkSize - Chunk size in bytes.
* @param {boolean} [options.quiet] - Suppress all progress output.
* @returns {{ attach(service: EventEmitter): void, detach(): void }}
* @returns {{ attach(observer: { on(event: string, fn: Function): void, removeListener(event: string, fn: Function): void }): void, detach(): void }}
*/
export function createStoreProgress({ filePath, chunkSize, quiet, fileSize: providedSize }) {
if (quiet) {
Expand Down Expand Up @@ -58,7 +58,7 @@ export function createStoreProgress({ filePath, chunkSize, quiet, fileSize: prov
* @param {Object} options
* @param {number} options.totalChunks - Number of chunks to restore.
* @param {boolean} [options.quiet] - Suppress all progress output.
* @returns {{ attach(service: EventEmitter): void, detach(): void }}
* @returns {{ attach(observer: { on(event: string, fn: Function): void, removeListener(event: string, fn: Function): void }): void, detach(): void }}
*/
export function createRestoreProgress({ totalChunks, quiet }) {
if (quiet || totalChunks === 0) {
Expand Down
43 changes: 23 additions & 20 deletions examples/progress-tracking.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/usr/bin/env node
/**
* Progress tracking demonstration using EventEmitter
* Progress tracking demonstration using EventEmitterObserver
*
* This example shows:
* 1. Accessing the CasService to attach event listeners
* 1. Creating an EventEmitterObserver for observability
* 2. Tracking chunk-by-chunk progress during store
* 3. Tracking chunk-by-chunk progress during restore
* 4. Building a real-time progress indicator
Expand All @@ -16,7 +16,7 @@ import { execSync } from 'node:child_process';
import path from 'node:path';
import os from 'node:os';
import GitPlumbing from '@git-stunts/plumbing';
import ContentAddressableStore from '@git-stunts/git-cas';
import ContentAddressableStore, { EventEmitterObserver } from '@git-stunts/git-cas';

console.log('=== Progress Tracking Example ===\n');

Expand All @@ -25,9 +25,8 @@ const repoDir = mkdtempSync(path.join(os.tmpdir(), 'cas-progress-'));
console.log(`Created temporary repository: ${repoDir}`);
execSync('git init --bare', { cwd: repoDir, stdio: 'ignore' });

// Initialize plumbing and CAS
const plumbing = GitPlumbing.createDefault({ cwd: repoDir });
const cas = ContentAddressableStore.createJson({ plumbing, chunkSize: 128 * 1024 }); // 128 KB chunks
// Initialize CAS with an EventEmitterObserver
const observer = new EventEmitterObserver();

// Create a larger test file to see multiple chunks
const testDir = mkdtempSync(path.join(os.tmpdir(), 'cas-test-'));
Expand All @@ -41,55 +40,52 @@ console.log(`File size: ${fileSize.toLocaleString()} bytes`);
console.log(`Chunk size: ${(128 * 1024).toLocaleString()} bytes`);
console.log(`Expected chunks: ~${Math.ceil(fileSize / (128 * 1024))}`);

// Get the CasService to attach event listeners
const service = await cas.getService();

// Progress tracker state
const progress = {
store: { chunks: 0, bytes: 0 },
restore: { chunks: 0, bytes: 0 }
};

// Event listeners for storage operations
// Event listeners for storage operations — subscribe on the observer
console.log('\n--- Setting up event listeners ---');

service.on('chunk:stored', (event) => {
observer.on('chunk:stored', (event) => {
progress.store.chunks++;
progress.store.bytes += event.size;
console.log(`[STORE] Chunk ${event.index} stored: ${event.size.toLocaleString()} bytes (digest: ${event.digest.substring(0, 8)}...)`);
});

service.on('file:stored', (event) => {
observer.on('file:stored', (event) => {
console.log(`[STORE] File complete: ${event.slug}`);
console.log(` Total size: ${event.size.toLocaleString()} bytes`);
console.log(` Total chunks: ${event.chunkCount}`);
console.log(` Encrypted: ${event.encrypted ? 'Yes' : 'No'}`);
});

service.on('chunk:restored', (event) => {
observer.on('chunk:restored', (event) => {
progress.restore.chunks++;
progress.restore.bytes += event.size;
console.log(`[RESTORE] Chunk ${event.index} restored: ${event.size.toLocaleString()} bytes (digest: ${event.digest.substring(0, 8)}...)`);
});

service.on('file:restored', (event) => {
observer.on('file:restored', (event) => {
console.log(`[RESTORE] File complete: ${event.slug}`);
console.log(` Total size: ${event.size.toLocaleString()} bytes`);
console.log(` Total chunks: ${event.chunkCount}`);
});

service.on('integrity:pass', (event) => {
observer.on('integrity:pass', (event) => {
console.log(`[INTEGRITY] Passed for: ${event.slug}`);
});

service.on('integrity:fail', (event) => {
observer.on('integrity:fail', (event) => {
console.error(`[INTEGRITY] FAILED for: ${event.slug}`);
console.error(` Chunk index: ${event.chunkIndex}`);
console.error(` Expected: ${event.expected}`);
console.error(` Actual: ${event.actual}`);
});

service.on('error', (event) => {
observer.on('error', (event) => {
console.error(`[ERROR] ${event.code}: ${event.message}`);
});

Expand All @@ -103,6 +99,12 @@ console.log(' - integrity:fail');
console.log(' - error');

// Step 1: Store the file with progress tracking
const cas = new ContentAddressableStore({
plumbing: GitPlumbing.createDefault({ cwd: repoDir }),
chunkSize: 128 * 1024,
observability: observer,
});

console.log('\n--- Step 1: Storing file (watch for chunk events) ---\n');
const startStore = Date.now();
const manifest = await cas.storeFile({
Expand Down Expand Up @@ -183,7 +185,7 @@ const progressListener = (event) => {
process.stdout.write(`\rProgress: [${progressBar}] ${percentage}% (${storeChunkCount}/${totalChunks} chunks)`);
};

service.on('chunk:stored', progressListener);
observer.on('chunk:stored', progressListener);

// Store another test file
const testFilePath2 = path.join(testDir, 'progress-demo.bin');
Expand All @@ -199,7 +201,7 @@ console.log('\n\nProgress tracking complete!');
console.log(`Final chunk count: ${storeChunkCount}`);

// Remove the progress listener to avoid cluttering output
service.removeListener('chunk:stored', progressListener);
observer.removeListener('chunk:stored', progressListener);

// Summary statistics
console.log('\n--- Performance Summary ---');
Expand All @@ -224,7 +226,8 @@ console.log('Temporary files removed');

console.log('\n=== Example completed successfully! ===');
console.log('\nKey takeaways:');
console.log('- Access CasService via cas.getService() for events');
console.log('- Create an EventEmitterObserver and pass it to ContentAddressableStore');
console.log('- Subscribe to events on the observer (not the service)');
console.log('- chunk:stored fires for each chunk during storage');
console.log('- chunk:restored fires for each chunk during restore');
console.log('- file:stored and file:restored fire when operations complete');
Expand Down
42 changes: 41 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import type {
CryptoPort,
CodecPort,
GitPersistencePort,
ObservabilityPort,
CasServiceOptions,
DeriveKeyOptions,
DeriveKeyResult,
} from "./src/domain/services/CasService.js";

export { CasService, Manifest, Chunk };
export type { EncryptionMeta, ManifestData, CompressionMeta, KdfParams, SubManifestRef, CryptoPort, CodecPort, GitPersistencePort, CasServiceOptions, DeriveKeyOptions, DeriveKeyResult };
export type { EncryptionMeta, ManifestData, CompressionMeta, KdfParams, SubManifestRef, CryptoPort, CodecPort, GitPersistencePort, ObservabilityPort, CasServiceOptions, DeriveKeyOptions, DeriveKeyResult };

/** Abstract port for cryptographic operations. */
export declare class CryptoPortBase {
Expand Down Expand Up @@ -93,14 +94,47 @@ export declare class CborCodec extends CodecPortBase {
constructor();
}

/** No-op observability adapter. */
export declare class SilentObserver {
metric(channel: string, data: Record<string, unknown>): void;
log(level: string, msg: string, meta?: Record<string, unknown>): void;
span(name: string): { end(meta?: Record<string, unknown>): void };
}

/** EventEmitter-based observability adapter for backward compatibility. */
export declare class EventEmitterObserver {
metric(channel: string, data: Record<string, unknown>): void;
log(level: string, msg: string, meta?: Record<string, unknown>): void;
span(name: string): { end(meta?: Record<string, unknown>): void };
on(event: string, listener: (...args: unknown[]) => void): this;
removeListener(event: string, listener: (...args: unknown[]) => void): this;
listenerCount(event: string): number;
}

/** Stats-collecting observability adapter. */
export declare class StatsCollector {
metric(channel: string, data: Record<string, unknown>): void;
log(level: string, msg: string, meta?: Record<string, unknown>): void;
span(name: string): { end(meta?: Record<string, unknown>): void };
summary(): {
chunksProcessed: number;
bytesTotal: number;
elapsed: number;
throughput: number;
errors: number;
};
}

/** Constructor options for {@link ContentAddressableStore}. */
export interface ContentAddressableStoreOptions {
plumbing: unknown;
chunkSize?: number;
codec?: CodecPort;
crypto?: CryptoPort;
observability?: ObservabilityPort;
policy?: unknown;
merkleThreshold?: number;
concurrency?: number;
}

/** A single vault entry. */
Expand Down Expand Up @@ -257,6 +291,12 @@ export default class ContentAddressableStore {
passphrase?: string;
}): Promise<{ buffer: Buffer; bytesWritten: number }>;

restoreStream(options: {
manifest: Manifest;
encryptionKey?: Buffer;
passphrase?: string;
}): AsyncIterable<Buffer>;

createTree(options: { manifest: Manifest }): Promise<string>;

verifyIntegrity(manifest: Manifest): Promise<boolean>;
Expand Down
Loading