Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b6a3962
Add telemetry infrastructure: CircuitBreaker and FeatureFlagCache
samikshya-db Jan 28, 2026
68652de
Add telemetry client management: TelemetryClient and Provider
samikshya-db Jan 28, 2026
97f2106
Add telemetry event emission and aggregation
samikshya-db Jan 28, 2026
44185e4
Add telemetry export: DatabricksTelemetryExporter
samikshya-db Jan 28, 2026
013f305
Add telemetry integration into driver components
samikshya-db Jan 28, 2026
e357588
Add authentication support for REST API calls
samikshya-db Jan 29, 2026
e8c2033
Update DatabricksTelemetryExporter to use authenticated export
samikshya-db Jan 29, 2026
a2dbfb1
Fix telemetry event listeners and add config options
samikshya-db Jan 29, 2026
62545d6
Match JDBC telemetry payload format
samikshya-db Jan 29, 2026
9ac0978
Fix lint errors
samikshya-db Jan 29, 2026
bfb8303
Add missing getAuthHeaders method to ClientContextStub
samikshya-db Jan 29, 2026
ce7723a
Fix prettier formatting
samikshya-db Jan 29, 2026
b8d20bf
Use nodejs-sql-driver as driver name in telemetry
samikshya-db Jan 30, 2026
43e404d
Add missing telemetry fields to match JDBC
samikshya-db Jan 30, 2026
c2daa4b
Populate all telemetry system configuration fields
samikshya-db Jan 30, 2026
228c2be
Fix telemetry aggregator cleanup in client close
samikshya-db Jan 30, 2026
4f6d5a6
Merge branch 'telemetry-5-export' into telemetry-6-integration
samikshya-db Jan 30, 2026
72372d2
Merge PR #328 fixes into PR #329
samikshya-db Feb 5, 2026
86d7b8d
Merge telemetry-5-export proxy fix into telemetry-6-integration
samikshya-db Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 262 additions & 0 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import thrift from 'thrift';
import Int64 from 'node-int64';
import os from 'os';

import { EventEmitter } from 'events';
import { HeadersInit } from 'node-fetch';
Expand All @@ -24,6 +25,14 @@ import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import DBSQLLogger from './DBSQLLogger';
import CloseableCollection from './utils/CloseableCollection';
import IConnectionProvider from './connection/contracts/IConnectionProvider';
import FeatureFlagCache from './telemetry/FeatureFlagCache';
import TelemetryClientProvider from './telemetry/TelemetryClientProvider';
import TelemetryEventEmitter from './telemetry/TelemetryEventEmitter';
import MetricsAggregator from './telemetry/MetricsAggregator';
import DatabricksTelemetryExporter from './telemetry/DatabricksTelemetryExporter';
import { CircuitBreakerRegistry } from './telemetry/CircuitBreaker';
import { DriverConfiguration, DRIVER_NAME } from './telemetry/types';
import driverVersion from './version';

function prependSlash(str: string): string {
if (str.length > 0 && str.charAt(0) !== '/') {
Expand Down Expand Up @@ -68,6 +77,19 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

private readonly sessions = new CloseableCollection<DBSQLSession>();

// Telemetry components (instance-based, NOT singletons)
private host?: string;

private featureFlagCache?: FeatureFlagCache;

private telemetryClientProvider?: TelemetryClientProvider;

private telemetryEmitter?: TelemetryEventEmitter;

private telemetryAggregator?: MetricsAggregator;

private circuitBreakerRegistry?: CircuitBreakerRegistry;

private static getDefaultLogger(): IDBSQLLogger {
if (!this.defaultLogger) {
this.defaultLogger = new DBSQLLogger();
Expand All @@ -94,6 +116,15 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
cloudFetchSpeedThresholdMBps: 0.1,

useLZ4Compression: true,

// Telemetry defaults
telemetryEnabled: false, // Initially disabled for safe rollout
telemetryBatchSize: 100,
telemetryFlushIntervalMs: 5000,
telemetryMaxRetries: 3,
telemetryAuthenticatedExport: true,
telemetryCircuitBreakerThreshold: 5,
telemetryCircuitBreakerTimeout: 60000, // 1 minute
};
}

Expand Down Expand Up @@ -152,6 +183,178 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
return new HttpConnection(this.getConnectionOptions(options), this);
}

/**
* Extract workspace ID from hostname.
* @param host - The host string (e.g., "workspace-id.cloud.databricks.com")
* @returns Workspace ID or host if extraction fails
*/
private extractWorkspaceId(host: string): string {
// Extract workspace ID from hostname (first segment before first dot)
const parts = host.split('.');
return parts.length > 0 ? parts[0] : host;
}

/**
* Build driver configuration for telemetry reporting.
* @returns DriverConfiguration object with current driver settings
*/
private buildDriverConfiguration(): DriverConfiguration {
return {
driverVersion,
driverName: DRIVER_NAME,
nodeVersion: process.version,
platform: process.platform,
osVersion: os.release(),
osArch: os.arch(),
runtimeVendor: 'Node.js Foundation',
localeName: this.getLocaleName(),
charSetEncoding: 'UTF-8',
processName: this.getProcessName(),

// Feature flags
cloudFetchEnabled: this.config.useCloudFetch ?? false,
lz4Enabled: this.config.useLZ4Compression ?? false,
arrowEnabled: this.config.arrowEnabled ?? false,
directResultsEnabled: true, // Direct results always enabled

// Configuration values
socketTimeout: this.config.socketTimeout ?? 0,
retryMaxAttempts: this.config.retryMaxAttempts ?? 0,
cloudFetchConcurrentDownloads: this.config.cloudFetchConcurrentDownloads ?? 0,
};
}

/**
* Get locale name in format language_country (e.g., en_US).
* Matches JDBC format: user.language + '_' + user.country
*/
private getLocaleName(): string {
try {
// Try to get from environment variables
const lang = process.env.LANG || process.env.LC_ALL || process.env.LC_MESSAGES || '';
if (lang) {
// LANG format is typically "en_US.UTF-8", extract "en_US"
const match = lang.match(/^([a-z]{2}_[A-Z]{2})/);
if (match) {
return match[1];
}
}
// Fallback to en_US
return 'en_US';
} catch {
return 'en_US';
}
}

/**
* Get process name, similar to JDBC's ProcessNameUtil.
* Returns the script name or process title.
*/
private getProcessName(): string {
try {
// Try process.title first (can be set by application)
if (process.title && process.title !== 'node') {
return process.title;
}
// Try to get the main script name from argv[1]
if (process.argv && process.argv.length > 1) {
const scriptPath = process.argv[1];
// Extract filename without path
const filename = scriptPath.split('/').pop()?.split('\\').pop() || '';
// Remove extension
const nameWithoutExt = filename.replace(/\.[^.]*$/, '');
if (nameWithoutExt) {
return nameWithoutExt;
}
}
return 'node';
} catch {
return 'node';
}
}

/**
* Initialize telemetry components if enabled.
* CRITICAL: All errors swallowed and logged at LogLevel.debug ONLY.
* Driver NEVER throws exceptions due to telemetry.
*/
private async initializeTelemetry(): Promise<void> {
if (!this.host) {
return;
}

try {
// Create feature flag cache instance
this.featureFlagCache = new FeatureFlagCache(this);
this.featureFlagCache.getOrCreateContext(this.host);

// Check if telemetry enabled via feature flag
const enabled = await this.featureFlagCache.isTelemetryEnabled(this.host);
if (!enabled) {
this.logger.log(LogLevel.debug, 'Telemetry disabled via feature flag');
return;
}

// Create telemetry components (all instance-based)
this.telemetryClientProvider = new TelemetryClientProvider(this);
this.telemetryEmitter = new TelemetryEventEmitter(this);

// Get or create telemetry client for this host (increments refCount)
this.telemetryClientProvider.getOrCreateClient(this.host);

// Create circuit breaker registry and exporter
this.circuitBreakerRegistry = new CircuitBreakerRegistry(this);
const exporter = new DatabricksTelemetryExporter(this, this.host, this.circuitBreakerRegistry);
this.telemetryAggregator = new MetricsAggregator(this, exporter);

// Wire up event listeners
this.telemetryEmitter.on('connection.open', (event) => {
try {
this.telemetryAggregator?.processEvent(event);
} catch (error: any) {
this.logger.log(LogLevel.debug, `Error processing connection.open event: ${error.message}`);
}
});

this.telemetryEmitter.on('statement.start', (event) => {
try {
this.telemetryAggregator?.processEvent(event);
} catch (error: any) {
this.logger.log(LogLevel.debug, `Error processing statement.start event: ${error.message}`);
}
});

this.telemetryEmitter.on('statement.complete', (event) => {
try {
this.telemetryAggregator?.processEvent(event);
} catch (error: any) {
this.logger.log(LogLevel.debug, `Error processing statement.complete event: ${error.message}`);
}
});

this.telemetryEmitter.on('cloudfetch.chunk', (event) => {
try {
this.telemetryAggregator?.processEvent(event);
} catch (error: any) {
this.logger.log(LogLevel.debug, `Error processing cloudfetch.chunk event: ${error.message}`);
}
});

this.telemetryEmitter.on('error', (event) => {
try {
this.telemetryAggregator?.processEvent(event);
} catch (error: any) {
this.logger.log(LogLevel.debug, `Error processing error event: ${error.message}`);
}
});

this.logger.log(LogLevel.debug, 'Telemetry initialized successfully');
} catch (error: any) {
// Swallow all telemetry initialization errors
this.logger.log(LogLevel.debug, `Telemetry initialization failed: ${error.message}`);
}
}

/**
* Connects DBSQLClient to endpoint
* @public
Expand All @@ -173,11 +376,25 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
}
}

// Store host for telemetry
this.host = options.host;

// Store enableMetricViewMetadata configuration
if (options.enableMetricViewMetadata !== undefined) {
this.config.enableMetricViewMetadata = options.enableMetricViewMetadata;
}

// Override telemetry config if provided in options
if (options.telemetryEnabled !== undefined) {
this.config.telemetryEnabled = options.telemetryEnabled;
}
if (options.telemetryBatchSize !== undefined) {
this.config.telemetryBatchSize = options.telemetryBatchSize;
}
if (options.telemetryAuthenticatedExport !== undefined) {
this.config.telemetryAuthenticatedExport = options.telemetryAuthenticatedExport;
}

this.authProvider = this.createAuthProvider(options, authProvider);

this.connectionProvider = this.createConnectionProvider(options);
Expand Down Expand Up @@ -211,6 +428,11 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
this.emit('timeout');
});

// Initialize telemetry if enabled
if (this.config.telemetryEnabled) {
await this.initializeTelemetry();
}

return this;
}

Expand Down Expand Up @@ -246,12 +468,52 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
serverProtocolVersion: response.serverProtocolVersion,
});
this.sessions.add(session);

// Emit connection.open telemetry event
if (this.telemetryEmitter && this.host) {
try {
const workspaceId = this.extractWorkspaceId(this.host);
const driverConfig = this.buildDriverConfiguration();
this.telemetryEmitter.emitConnectionOpen({
sessionId: session.id,
workspaceId,
driverConfig,
});
} catch (error: any) {
// CRITICAL: All telemetry exceptions swallowed
this.logger.log(LogLevel.debug, `Error emitting connection.open event: ${error.message}`);
}
}

return session;
}

public async close(): Promise<void> {
await this.sessions.closeAll();

// Cleanup telemetry
if (this.host) {
try {
// Step 1: Close aggregator (stops timer, completes statements, final flush)
if (this.telemetryAggregator) {
this.telemetryAggregator.close();
}

// Step 2: Release telemetry client (decrements ref count, closes if last)
if (this.telemetryClientProvider) {
await this.telemetryClientProvider.releaseClient(this.host);
}

// Step 3: Release feature flag context (decrements ref count)
if (this.featureFlagCache) {
this.featureFlagCache.releaseContext(this.host);
}
} catch (error: any) {
// Swallow all telemetry cleanup errors
this.logger.log(LogLevel.debug, `Telemetry cleanup error: ${error.message}`);
}
}

this.client = undefined;
this.connectionProvider = undefined;
this.authProvider = undefined;
Expand Down
Loading
Loading