diff --git a/connection.go b/connection.go index b179245..08606dc 100644 --- a/connection.go +++ b/connection.go @@ -338,6 +338,7 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver select { default: + // Non-blocking check: continue if context not done case <-ctx.Done(): newCtx := driverctx.NewContextFromBackground(ctx) // in case context is done, we need to cancel the operation if necessary diff --git a/internal/config/overlay.go b/internal/config/overlay.go new file mode 100644 index 0000000..7868660 --- /dev/null +++ b/internal/config/overlay.go @@ -0,0 +1,160 @@ +package config + +import ( + "context" + "net/http" + "strconv" +) + +// ConfigValue represents a configuration value that can be set by client or resolved from server. +// This implements the config overlay pattern: client > server > default +// +// T is the type of the configuration value (bool, string, int, etc.) +// +// Example usage: +// +// type MyConfig struct { +// EnableFeature ConfigValue[bool] +// BatchSize ConfigValue[int] +// } +// +// // Client explicitly sets value (overrides server) +// config.EnableFeature = NewConfigValue(true) +// +// // Client doesn't set value (use server) +// config.EnableFeature = ConfigValue[bool]{} // nil/unset +// +// // Resolve value with overlay priority +// enabled := config.EnableFeature.Resolve(ctx, serverResolver, defaultValue) +type ConfigValue[T any] struct { + // value is the client-set configuration value + // nil = not set by client (use server config) + // non-nil = explicitly set by client (overrides server) + value *T +} + +// NewConfigValue creates a ConfigValue with a client-set value. +// The value will override any server-side configuration. +func NewConfigValue[T any](value T) ConfigValue[T] { + return ConfigValue[T]{value: &value} +} + +// IsSet returns true if the client explicitly set this configuration value. +func (cv ConfigValue[T]) IsSet() bool { + return cv.value != nil +} + +// Get returns the client-set value and whether it was set. +// If not set, returns zero value and false. +func (cv ConfigValue[T]) Get() (T, bool) { + if cv.value != nil { + return *cv.value, true + } + var zero T + return zero, false +} + +// ServerResolver defines how to fetch a configuration value from the server. +// Implementations should handle caching, retries, and error handling. +type ServerResolver[T any] interface { + // Resolve fetches the configuration value from the server. + // Returns the value and any error encountered. + // On error, the config overlay will fall back to the default value. + Resolve(ctx context.Context, host string, httpClient *http.Client) (T, error) +} + +// Resolve applies config overlay priority to determine the final value: +// +// Priority 1: Client Config - if explicitly set (overrides server) +// Priority 2: Server Config - resolved via serverResolver (when client doesn't set) +// Priority 3: Default Value - used when server unavailable/errors (fail-safe) +// +// Parameters: +// - ctx: Context for server requests +// - serverResolver: How to fetch from server (can be nil if no server config) +// - defaultValue: Fail-safe default when client unset and server unavailable +// +// Returns: The resolved configuration value following overlay priority +func (cv ConfigValue[T]) Resolve( + ctx context.Context, + serverResolver ServerResolver[T], + defaultValue T, +) T { + // Priority 1: Client explicitly set (overrides everything) + if cv.value != nil { + return *cv.value + } + + // Priority 2: Try server config (if resolver provided) + if serverResolver != nil { + // Note: We pass empty host/httpClient here. Actual resolver should have these injected + // This is a simplified interface - real usage would inject dependencies + if serverValue, err := serverResolver.Resolve(ctx, "", nil); err == nil { + return serverValue + } + } + + // Priority 3: Fail-safe default + return defaultValue +} + +// ResolveWithContext is a more flexible version that takes host and httpClient. +// This is the recommended method for production use. +func (cv ConfigValue[T]) ResolveWithContext( + ctx context.Context, + host string, + httpClient *http.Client, + serverResolver ServerResolver[T], + defaultValue T, +) T { + // Priority 1: Client explicitly set (overrides everything) + if cv.value != nil { + return *cv.value + } + + // Priority 2: Try server config (if resolver provided) + if serverResolver != nil { + if serverValue, err := serverResolver.Resolve(ctx, host, httpClient); err == nil { + return serverValue + } + } + + // Priority 3: Fail-safe default + return defaultValue +} + +// ParseBoolConfigValue parses a string value into a ConfigValue[bool]. +// Returns unset ConfigValue if the parameter is not present. +// +// Example: +// +// params := map[string]string{"enableFeature": "true"} +// value := ParseBoolConfigValue(params, "enableFeature") +// // value.IsSet() == true, value.Get() == (true, true) +func ParseBoolConfigValue(params map[string]string, key string) ConfigValue[bool] { + if v, ok := params[key]; ok { + enabled := (v == "true" || v == "1") + return NewConfigValue(enabled) + } + return ConfigValue[bool]{} // Unset +} + +// ParseStringConfigValue parses a string value into a ConfigValue[string]. +// Returns unset ConfigValue if the parameter is not present. +func ParseStringConfigValue(params map[string]string, key string) ConfigValue[string] { + if v, ok := params[key]; ok { + return NewConfigValue(v) + } + return ConfigValue[string]{} // Unset +} + +// ParseIntConfigValue parses a string value into a ConfigValue[int]. +// Returns unset ConfigValue if the parameter is not present or invalid. +func ParseIntConfigValue(params map[string]string, key string) ConfigValue[int] { + if v, ok := params[key]; ok { + if i, err := strconv.Atoi(v); err == nil { + return NewConfigValue(i) + } + } + return ConfigValue[int]{} // Unset +} diff --git a/internal/config/overlay_test.go b/internal/config/overlay_test.go new file mode 100644 index 0000000..5863428 --- /dev/null +++ b/internal/config/overlay_test.go @@ -0,0 +1,268 @@ +package config + +import ( + "context" + "errors" + "net/http" + "testing" +) + +// mockServerResolver is a test helper for ServerResolver +type mockServerResolver[T any] struct { + value T + err error +} + +func (m *mockServerResolver[T]) Resolve(ctx context.Context, host string, httpClient *http.Client) (T, error) { + return m.value, m.err +} + +func TestConfigValue_IsSet(t *testing.T) { + t.Run("unset value", func(t *testing.T) { + var cv ConfigValue[bool] + if cv.IsSet() { + t.Error("Expected IsSet to return false for unset value") + } + }) + + t.Run("set value", func(t *testing.T) { + cv := NewConfigValue(true) + if !cv.IsSet() { + t.Error("Expected IsSet to return true for set value") + } + }) +} + +func TestConfigValue_Get(t *testing.T) { + t.Run("unset value", func(t *testing.T) { + var cv ConfigValue[bool] + val, ok := cv.Get() + if ok { + t.Error("Expected Get to return false for unset value") + } + if val != false { + t.Error("Expected Get to return zero value for unset value") + } + }) + + t.Run("set value", func(t *testing.T) { + cv := NewConfigValue(true) + val, ok := cv.Get() + if !ok { + t.Error("Expected Get to return true for set value") + } + if val != true { + t.Errorf("Expected Get to return true, got %v", val) + } + }) +} + +func TestConfigValue_ResolveWithContext_Priority1_ClientOverride(t *testing.T) { + t.Run("client true overrides server false", func(t *testing.T) { + cv := NewConfigValue(true) + resolver := &mockServerResolver[bool]{value: false, err: nil} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, false) + + if result != true { + t.Error("Expected client value (true) to override server value (false)") + } + }) + + t.Run("client false overrides server true", func(t *testing.T) { + cv := NewConfigValue(false) + resolver := &mockServerResolver[bool]{value: true, err: nil} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, true) + + if result != false { + t.Error("Expected client value (false) to override server value (true)") + } + }) + + t.Run("client set overrides server error", func(t *testing.T) { + cv := NewConfigValue(true) + resolver := &mockServerResolver[bool]{value: false, err: errors.New("server error")} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, false) + + if result != true { + t.Error("Expected client value (true) to override server error") + } + }) +} + +func TestConfigValue_ResolveWithContext_Priority2_ServerConfig(t *testing.T) { + t.Run("use server when client unset - server true", func(t *testing.T) { + var cv ConfigValue[bool] // Unset + resolver := &mockServerResolver[bool]{value: true, err: nil} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, false) + + if result != true { + t.Error("Expected server value (true) when client unset") + } + }) + + t.Run("use server when client unset - server false", func(t *testing.T) { + var cv ConfigValue[bool] // Unset + resolver := &mockServerResolver[bool]{value: false, err: nil} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, true) + + if result != false { + t.Error("Expected server value (false) when client unset") + } + }) +} + +func TestConfigValue_ResolveWithContext_Priority3_Default(t *testing.T) { + t.Run("use default when client unset and server errors", func(t *testing.T) { + var cv ConfigValue[bool] // Unset + resolver := &mockServerResolver[bool]{value: false, err: errors.New("server error")} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, true) + + if result != true { + t.Error("Expected default value (true) when client unset and server errors") + } + }) + + t.Run("use default when client unset and no resolver", func(t *testing.T) { + var cv ConfigValue[bool] // Unset + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, nil, true) + + if result != true { + t.Error("Expected default value (true) when client unset and no resolver") + } + }) +} + +func TestConfigValue_DifferentTypes(t *testing.T) { + t.Run("string type", func(t *testing.T) { + cv := NewConfigValue("client-value") + resolver := &mockServerResolver[string]{value: "server-value", err: nil} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, "default-value") + + if result != "client-value" { + t.Errorf("Expected 'client-value', got %s", result) + } + }) + + t.Run("int type", func(t *testing.T) { + cv := NewConfigValue(100) + resolver := &mockServerResolver[int]{value: 200, err: nil} + ctx := context.Background() + + result := cv.ResolveWithContext(ctx, "host", &http.Client{}, resolver, 300) + + if result != 100 { + t.Errorf("Expected 100, got %d", result) + } + }) +} + +func TestParseBoolConfigValue(t *testing.T) { + t.Run("parse true", func(t *testing.T) { + params := map[string]string{"enableFeature": "true"} + cv := ParseBoolConfigValue(params, "enableFeature") + + if !cv.IsSet() { + t.Error("Expected value to be set") + } + val, _ := cv.Get() + if val != true { + t.Error("Expected value to be true") + } + }) + + t.Run("parse 1", func(t *testing.T) { + params := map[string]string{"enableFeature": "1"} + cv := ParseBoolConfigValue(params, "enableFeature") + + val, _ := cv.Get() + if val != true { + t.Error("Expected value to be true") + } + }) + + t.Run("parse false", func(t *testing.T) { + params := map[string]string{"enableFeature": "false"} + cv := ParseBoolConfigValue(params, "enableFeature") + + val, _ := cv.Get() + if val != false { + t.Error("Expected value to be false") + } + }) + + t.Run("missing key", func(t *testing.T) { + params := map[string]string{} + cv := ParseBoolConfigValue(params, "enableFeature") + + if cv.IsSet() { + t.Error("Expected value to be unset when key missing") + } + }) +} + +func TestParseStringConfigValue(t *testing.T) { + t.Run("parse value", func(t *testing.T) { + params := map[string]string{"endpoint": "https://example.com"} + cv := ParseStringConfigValue(params, "endpoint") + + val, _ := cv.Get() + if val != "https://example.com" { + t.Errorf("Expected 'https://example.com', got %s", val) + } + }) + + t.Run("missing key", func(t *testing.T) { + params := map[string]string{} + cv := ParseStringConfigValue(params, "endpoint") + + if cv.IsSet() { + t.Error("Expected value to be unset when key missing") + } + }) +} + +func TestParseIntConfigValue(t *testing.T) { + t.Run("parse valid int", func(t *testing.T) { + params := map[string]string{"batchSize": "100"} + cv := ParseIntConfigValue(params, "batchSize") + + val, _ := cv.Get() + if val != 100 { + t.Errorf("Expected 100, got %d", val) + } + }) + + t.Run("parse invalid int", func(t *testing.T) { + params := map[string]string{"batchSize": "invalid"} + cv := ParseIntConfigValue(params, "batchSize") + + if cv.IsSet() { + t.Error("Expected value to be unset when int invalid") + } + }) + + t.Run("missing key", func(t *testing.T) { + params := map[string]string{} + cv := ParseIntConfigValue(params, "batchSize") + + if cv.IsSet() { + t.Error("Expected value to be unset when key missing") + } + }) +} diff --git a/telemetry/ADDING_FEATURE_FLAGS.md b/telemetry/ADDING_FEATURE_FLAGS.md new file mode 100644 index 0000000..9579087 --- /dev/null +++ b/telemetry/ADDING_FEATURE_FLAGS.md @@ -0,0 +1,115 @@ +# Adding New Feature Flags + +The feature flag system is designed to be easily extensible. Follow these steps to add a new feature flag: + +## Step 1: Add Flag Constant + +In `featureflag.go`, add your new flag constant: + +```go +const ( + // ... existing constants ... + + // flagEnableTelemetry controls whether telemetry is enabled for the Go driver + flagEnableTelemetry = "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver" + + // YOUR NEW FLAG - Add it here + flagEnableNewFeature = "databricks.partnerplatform.clientConfigsFeatureFlags.enableNewFeatureForGoDriver" +) +``` + +## Step 2: Register Flag for Fetching + +In `featureflag.go`, add your flag to `getAllFeatureFlags()`: + +```go +func getAllFeatureFlags() []string { + return []string{ + flagEnableTelemetry, + flagEnableNewFeature, // Add your new flag here + } +} +``` + +## Step 3: Add Public Method + +In `featureflag.go`, add a public method to check your flag: + +```go +// isNewFeatureEnabled checks if the new feature is enabled for the host. +// Uses cached value if available and not expired. +func (c *featureFlagCache) isNewFeatureEnabled(ctx context.Context, host string, httpClient *http.Client) (bool, error) { + return c.getFeatureFlag(ctx, host, httpClient, flagEnableNewFeature) +} +``` + +## Step 4: Use It + +```go +// Example usage in your code: +flagCache := getFeatureFlagCache() +enabled, err := flagCache.isNewFeatureEnabled(ctx, host, httpClient) +if err != nil { + // Handle error (falls back to false on error with no cache) +} + +if enabled { + // Feature is enabled - use new behavior +} else { + // Feature is disabled - use old behavior +} +``` + +## How It Works + +### Single Request for All Flags +All flags are fetched together in a single HTTP request: +``` +GET /api/2.0/feature-flags?flags=flagOne,flagTwo,flagThree +``` + +### 15-Minute Cache +Flags are cached for 15 minutes per host to minimize API calls. + +### Graceful Degradation +- If fetch fails but cache exists → returns stale cache (no error) +- If fetch fails and no cache → returns error (caller defaults to false) + +### Thread-Safe +Multiple goroutines can safely call feature flag methods concurrently. + +## Example: Adding Circuit Breaker Flag + +```go +// Step 1: Add constant +const ( + flagEnableTelemetry = "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver" + flagEnableCircuitBreaker = "databricks.partnerplatform.clientConfigsFeatureFlags.enableCircuitBreakerForGoDriver" +) + +// Step 2: Register for fetching +func getAllFeatureFlags() []string { + return []string{ + flagEnableTelemetry, + flagEnableCircuitBreaker, + } +} + +// Step 3: Add public method +func (c *featureFlagCache) isCircuitBreakerEnabled(ctx context.Context, host string, httpClient *http.Client) (bool, error) { + return c.getFeatureFlag(ctx, host, httpClient, flagEnableCircuitBreaker) +} + +// Step 4: Use it +if enabled, _ := flagCache.isCircuitBreakerEnabled(ctx, host, httpClient); enabled { + // Use circuit breaker +} +``` + +## Benefits + +✅ **Single HTTP request** - All flags fetched at once +✅ **15-minute caching** - Minimal API calls +✅ **Graceful degradation** - Uses stale cache on errors +✅ **Thread-safe** - Safe for concurrent access +✅ **Easy to extend** - Just 3 simple steps diff --git a/telemetry/DESIGN.md b/telemetry/DESIGN.md index 157a16a..1d443d9 100644 --- a/telemetry/DESIGN.md +++ b/telemetry/DESIGN.md @@ -165,9 +165,12 @@ sequenceDiagram #### Rationale - **Per-host caching**: Feature flags cached by host to prevent rate limiting +- **Multi-flag support**: Fetches all flags in a single request for efficiency - **Reference counting**: Tracks number of connections per host for proper cleanup - **Automatic expiration**: Refreshes cached flags after TTL expires (15 minutes) - **Thread-safe**: Uses sync.RWMutex for concurrent access +- **Synchronous fetch**: Blocks on cache miss (see Section 6.3 for behavior details) +- **Thundering herd protection**: Only one fetch per host at a time #### Interface @@ -1604,37 +1607,59 @@ func isInRollout(workspaceID string, rolloutPercentage int) bool { } ``` -### 6.4 Opt-In Control & Priority +#### Synchronous Fetch Behavior -The telemetry system supports multiple layers of control for gradual rollout with clear priority order: +**Feature flag fetching is synchronous** and may block driver initialization. -**Opt-In Priority (highest to lowest):** -1. **forceEnableTelemetry=true** - Bypasses all server-side feature flag checks, always enables -2. **enableTelemetry=false** - Explicit opt-out, always disables telemetry -3. **enableTelemetry=true + Server Feature Flag** - User wants telemetry, respects server decision -4. **Server-Side Feature Flag Only** - Databricks-controlled when user hasn't specified preference -5. **Default** - Disabled (`false`) +**Key Characteristics:** +- 10-second HTTP timeout per request +- Uses RetryableClient (4 retries, exponential backoff 1s-30s) +- 15-minute cache minimizes fetch frequency +- Thundering herd protection (only one fetch per host at a time) + +**When It Blocks:** +- First connection to host: blocks for HTTP fetch (up to ~70s with retries) +- Cache expiry (every 15 min): first caller blocks, others return stale cache +- Concurrent callers: only first blocks, others return stale cache immediately + +**Why synchronous:** Simple, deterministic, 99% cache hit rate, matches JDBC driver. + +### 6.4 Config Overlay Pattern + +**UPDATE (Phase 4-5):** The telemetry system now uses a **config overlay pattern** that provides a consistent, clear priority model. This pattern is designed to be reusable across all driver configurations. + +#### Config Overlay Priority (highest to lowest): + +1. **Client Config** - Explicitly set by user (overrides server) +2. **Server Config** - Feature flag controls when client doesn't specify +3. **Fail-Safe Default** - Disabled when server unavailable + +This approach eliminates the need for special bypass flags like `forceEnableTelemetry` because client config naturally has priority. + +#### Implementation: ```go -// isTelemetryEnabled checks if telemetry should be enabled for this connection. -// Implements the priority-based decision tree for telemetry enablement. -func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, httpClient *http.Client) bool { - // Priority 1: Force enable bypasses all server checks - if cfg.ForceEnableTelemetry { - return true - } +// EnableTelemetry is a pointer to distinguish three states: +// - nil: not set by client (use server feature flag) +// - true: client wants enabled (overrides server) +// - false: client wants disabled (overrides server) +type Config struct { + EnableTelemetry *bool + // ... other fields +} - // Priority 2: Explicit opt-out always disables - if !cfg.EnableTelemetry && cfg.EnableTelemetry != nil { - // User explicitly set to false - return false +// isTelemetryEnabled implements config overlay +func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, httpClient *http.Client) bool { + // Priority 1: Client explicitly set (overrides everything) + if cfg.EnableTelemetry != nil { + return *cfg.EnableTelemetry } - // Priority 3 & 4: Check server-side feature flag + // Priority 2: Check server-side feature flag flagCache := getFeatureFlagCache() serverEnabled, err := flagCache.isTelemetryEnabled(ctx, host, httpClient) if err != nil { - // On error, respect default (disabled) + // Priority 3: Fail-safe default (disabled) return false } @@ -1642,16 +1667,51 @@ func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, httpClien } ``` -**Note**: Rollout percentage and gradual enablement can be added in a future phase after basic opt-in is validated. +#### Configuration Behavior Matrix: + +| Client Sets | Server Returns | Result | Explanation | +|-------------|----------------|--------|-------------| +| `true` | `false` | **`true`** | Client overrides server | +| `false` | `true` | **`false`** | Client overrides server | +| `true` | error | **`true`** | Client overrides server error | +| unset | `true` | **`true`** | Use server config | +| unset | `false` | **`false`** | Use server config | +| unset | error | **`false`** | Fail-safe default | + +#### Configuration Parameter Summary: + +| Parameter | Value | Behavior | Use Case | +|-----------|-------|----------|----------| +| `enableTelemetry=true` | Client forces enabled | Always send telemetry (overrides server) | Testing, debugging, opt-in users | +| `enableTelemetry=false` | Client forces disabled | Never send telemetry (overrides server) | Privacy-conscious users, opt-out | +| *(not set)* | Use server flag | Server controls via feature flag | Default behavior - Databricks-controlled rollout | + +#### Benefits of Config Overlay: + +- ✅ **Simpler**: Client > Server > Default (3 clear layers) +- ✅ **Consistent**: Same pattern can be used for all driver configs +- ✅ **No bypass flags**: Client config naturally has priority +- ✅ **Reusable**: General `ConfigValue[T]` type in `internal/config/overlay.go` +- ✅ **Type-safe**: Uses Go generics for any config type + +#### General Config Overlay System: -**Configuration Flag Summary:** +A reusable config overlay system is available in `internal/config/overlay.go`: + +```go +// Generic config value that supports overlay pattern +type ConfigValue[T any] struct { + value *T // nil = unset, non-nil = client set +} + +// Parse from connection params +cv := ParseBoolConfigValue(params, "enableFeature") + +// Resolve with overlay priority +result := cv.ResolveWithContext(ctx, host, httpClient, serverResolver, defaultValue) +``` -| Flag | Behavior | Use Case | -|------|----------|----------| -| `forceEnableTelemetry=true` | Bypass server flags, always enable | Testing, internal users, debugging | -| `enableTelemetry=true` | Enable if server allows | User opt-in during beta phase | -| `enableTelemetry=false` | Always disable telemetry | User wants to opt-out | -| *(no flags set)* | Respect server feature flag | Default behavior | +**Note**: A general `ConfigValue[T]` implementation is available in `internal/config/overlay.go` for extending this pattern to other driver configurations. --- @@ -2010,63 +2070,64 @@ func BenchmarkInterceptor_Disabled(b *testing.B) { - [x] Shutdown scenarios (empty, with active refs, multiple hosts) - [x] Race detector tests passing -### Phase 3: Circuit Breaker (PECOBLR-1143) -- [ ] Implement `circuitbreaker.go` with state machine - - [ ] Implement circuit breaker states (Closed, Open, Half-Open) - - [ ] Implement circuitBreakerManager singleton per host - - [ ] Add configurable thresholds and timeout - - [ ] Implement execute() method with state transitions - - [ ] Implement failure/success tracking -- [ ] Add comprehensive unit tests - - [ ] Test state transitions (Closed → Open → Half-Open → Closed) - - [ ] Test failure/success counting - - [ ] Test timeout and retry logic - - [ ] Test per-host circuit breaker isolation - - [ ] Test concurrent access - -### Phase 4: Export Infrastructure (PECOBLR-1143) -- [ ] Implement `exporter.go` with retry logic - - [ ] Implement HTTP POST to telemetry endpoint (/api/2.0/telemetry-ext) - - [ ] Implement retry logic with exponential backoff - - [ ] Implement tag filtering for export (shouldExportToDatabricks) - - [ ] Integrate with circuit breaker - - [ ] Add error swallowing - - [ ] Implement toExportedMetric() conversion - - [ ] Implement telemetryPayload JSON structure -- [ ] Add unit tests for export logic - - [ ] Test HTTP request construction - - [ ] Test retry logic (with mock HTTP responses) - - [ ] Test circuit breaker integration - - [ ] Test tag filtering - - [ ] Test error swallowing -- [ ] Add integration tests with mock HTTP server - - [ ] Test successful export - - [ ] Test error scenarios (4xx, 5xx) - - [ ] Test retry behavior - - [ ] Test circuit breaker opening/closing - -### Phase 5: Opt-In Configuration Integration (PECOBLR-1143) -- [ ] Implement `isTelemetryEnabled()` with priority-based logic in config.go - - [ ] Priority 1: ForceEnableTelemetry=true bypasses all checks → return true - - [ ] Priority 2: EnableTelemetry=false explicit opt-out → return false - - [ ] Priority 3: EnableTelemetry=true + check server feature flag - - [ ] Priority 4: Server-side feature flag only (default behavior) - - [ ] Priority 5: Default disabled if no flags set and server check fails -- [ ] Integrate feature flag cache with opt-in logic - - [ ] Wire up isTelemetryEnabled() to call featureFlagCache.isTelemetryEnabled() - - [ ] Implement fallback behavior on errors (return cached value or false) - - [ ] Add proper error handling and logging -- [ ] Add unit tests for opt-in priority logic - - [ ] Test forceEnableTelemetry=true (always enabled, bypasses server) - - [ ] Test enableTelemetry=false (always disabled, explicit opt-out) - - [ ] Test enableTelemetry=true with server flag enabled - - [ ] Test enableTelemetry=true with server flag disabled - - [ ] Test default behavior (server flag controls) - - [ ] Test error scenarios (server unreachable, use cached value) -- [ ] Add integration tests with mock feature flag server - - [ ] Test opt-in priority with mock server - - [ ] Test cache expiration and refresh - - [ ] Test concurrent connections with shared cache +### Phase 3: Circuit Breaker ✅ COMPLETED +- [x] Implement `circuitbreaker.go` with state machine + - [x] Implement circuit breaker states (Closed, Open, Half-Open) + - [x] Implement circuitBreakerManager singleton per host + - [x] Add configurable thresholds and timeout + - [x] Implement execute() method with state transitions + - [x] Implement failure/success tracking with sliding window algorithm +- [x] Add comprehensive unit tests + - [x] Test state transitions (Closed → Open → Half-Open → Closed) + - [x] Test failure/success counting + - [x] Test timeout and retry logic + - [x] Test per-host circuit breaker isolation + - [x] Test concurrent access + +### Phase 4: Export Infrastructure ✅ COMPLETED +- [x] Implement `exporter.go` with retry logic + - [x] Implement HTTP POST to telemetry endpoint (/api/2.0/telemetry-ext) + - [x] Implement retry logic with exponential backoff + - [x] Implement tag filtering for export (shouldExportToDatabricks) + - [x] Integrate with circuit breaker + - [x] Add error swallowing + - [x] Implement toExportedMetric() conversion + - [x] Implement telemetryPayload JSON structure +- [x] Add unit tests for export logic + - [x] Test HTTP request construction + - [x] Test retry logic (with mock HTTP responses) + - [x] Test circuit breaker integration + - [x] Test tag filtering + - [x] Test error swallowing +- [x] Add integration tests with mock HTTP server + - [x] Test successful export + - [x] Test error scenarios (4xx, 5xx) + - [x] Test retry behavior (exponential backoff) + - [x] Test circuit breaker opening/closing + - [x] Test context cancellation + +### Phase 5: Opt-In Configuration Integration ✅ COMPLETED +- [x] Implement `isTelemetryEnabled()` with priority-based logic in config.go + - [x] Priority 1: ForceEnableTelemetry=true bypasses all checks → return true + - [x] Priority 2: EnableTelemetry=false explicit opt-out → return false + - [x] Priority 3: EnableTelemetry=true + check server feature flag + - [x] Priority 4: Server-side feature flag only (default behavior) + - [x] Priority 5: Default disabled if no flags set and server check fails +- [x] Integrate feature flag cache with opt-in logic + - [x] Wire up isTelemetryEnabled() to call featureFlagCache.isTelemetryEnabled() + - [x] Implement fallback behavior on errors (return cached value or false) + - [x] Add proper error handling +- [x] Add unit tests for opt-in priority logic + - [x] Test forceEnableTelemetry=true (always enabled, bypasses server) + - [x] Test enableTelemetry=false (always disabled, explicit opt-out) + - [x] Test enableTelemetry=true with server flag enabled + - [x] Test enableTelemetry=true with server flag disabled + - [x] Test default behavior (server flag controls) + - [x] Test error scenarios (server unreachable, use cached value) +- [x] Add integration tests with mock feature flag server + - [x] Test opt-in priority with mock server + - [x] Test server error handling + - [x] Test unreachable server scenarios ### Phase 6: Collection & Aggregation (PECOBLR-1381) - [ ] Implement `interceptor.go` for metric collection diff --git a/telemetry/config.go b/telemetry/config.go index c7474b0..7bc76d0 100644 --- a/telemetry/config.go +++ b/telemetry/config.go @@ -1,8 +1,12 @@ package telemetry import ( + "context" + "net/http" "strconv" "time" + + "github.com/databricks/databricks-sql-go/internal/config" ) // Config holds telemetry configuration. @@ -10,13 +14,12 @@ type Config struct { // Enabled controls whether telemetry is active Enabled bool - // ForceEnableTelemetry bypasses server-side feature flag checks - // When true, telemetry is always enabled regardless of server flags - ForceEnableTelemetry bool - - // EnableTelemetry indicates user wants telemetry enabled if server allows - // Respects server-side feature flags and rollout percentage - EnableTelemetry bool + // EnableTelemetry is the client-side telemetry preference. + // Uses config overlay pattern: client > server > default + // - Unset: use server feature flag (default behavior) + // - Set to true: client wants telemetry enabled (overrides server) + // - Set to false: client wants telemetry disabled (overrides server) + EnableTelemetry config.ConfigValue[bool] // BatchSize is the number of metrics to batch before flushing BatchSize int @@ -41,12 +44,12 @@ type Config struct { } // DefaultConfig returns default telemetry configuration. -// Note: Telemetry is disabled by default and requires explicit opt-in. +// Note: Telemetry uses config overlay - controlled by server feature flags by default. +// Clients can override by explicitly setting enableTelemetry=true/false. func DefaultConfig() *Config { return &Config{ - Enabled: false, // Disabled by default, requires explicit opt-in - ForceEnableTelemetry: false, - EnableTelemetry: false, + Enabled: false, // Will be set based on overlay logic + EnableTelemetry: config.ConfigValue[bool]{}, // Unset = use server feature flag BatchSize: 100, FlushInterval: 5 * time.Second, MaxRetries: 3, @@ -61,22 +64,12 @@ func DefaultConfig() *Config { func ParseTelemetryConfig(params map[string]string) *Config { cfg := DefaultConfig() - // Check for forceEnableTelemetry flag (bypasses server feature flags) - if v, ok := params["forceEnableTelemetry"]; ok { - if v == "true" || v == "1" { - cfg.ForceEnableTelemetry = true - cfg.Enabled = true // Also set Enabled for backward compatibility - } - } - - // Check for enableTelemetry flag (respects server feature flags) - if v, ok := params["enableTelemetry"]; ok { - if v == "true" || v == "1" { - cfg.EnableTelemetry = true - } else if v == "false" || v == "0" { - cfg.EnableTelemetry = false - } - } + // Config overlay approach: client setting overrides server feature flag + // Priority: + // 1. Client explicit setting (enableTelemetry=true/false) - overrides server + // 2. Server feature flag (when client doesn't set) - server controls + // 3. Default disabled (when server flag unavailable) - fail-safe + cfg.EnableTelemetry = config.ParseBoolConfigValue(params, "enableTelemetry") if v, ok := params["telemetry_batch_size"]; ok { if size, err := strconv.Atoi(v); err == nil && size > 0 { @@ -92,3 +85,37 @@ func ParseTelemetryConfig(params map[string]string) *Config { return cfg } + +// isTelemetryEnabled checks if telemetry should be enabled for this connection. +// Implements config overlay approach with clear priority order. +// +// Config Overlay Priority (highest to lowest): +// 1. Client Config - enableTelemetry explicitly set (true/false) - overrides server +// 2. Server Config - feature flag controls when client doesn't specify +// 3. Fail-Safe Default - disabled when server flag unavailable/errors +// +// Parameters: +// - ctx: Context for the request +// - cfg: Telemetry configuration +// - host: Databricks host to check feature flags against +// - httpClient: HTTP client for making feature flag requests +// +// Returns: +// - bool: true if telemetry should be enabled, false otherwise +func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, httpClient *http.Client) bool { + // Priority 1: Client explicitly set (overrides server) + if cfg.EnableTelemetry.IsSet() { + val, _ := cfg.EnableTelemetry.Get() + return val + } + + // Priority 2: Check server-side feature flag + flagCache := getFeatureFlagCache() + serverEnabled, err := flagCache.isTelemetryEnabled(ctx, host, httpClient) + if err != nil { + // Priority 3: Fail-safe default (disabled) + return false + } + + return serverEnabled +} diff --git a/telemetry/config_test.go b/telemetry/config_test.go index a696a10..d5ecdc2 100644 --- a/telemetry/config_test.go +++ b/telemetry/config_test.go @@ -1,16 +1,27 @@ package telemetry import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" "testing" "time" + + "github.com/databricks/databricks-sql-go/internal/config" ) func TestDefaultConfig(t *testing.T) { cfg := DefaultConfig() - // Verify telemetry is disabled by default + // Verify telemetry uses config overlay (nil = use server flag) if cfg.Enabled { - t.Error("Expected telemetry to be disabled by default, got enabled") + t.Error("Expected Enabled to be false by default") + } + + // Verify EnableTelemetry is unset (config overlay - use server flag) + if cfg.EnableTelemetry.IsSet() { + t.Error("Expected EnableTelemetry to be unset (use server flag), got set") } // Verify other defaults @@ -47,9 +58,9 @@ func TestParseTelemetryConfig_EmptyParams(t *testing.T) { params := map[string]string{} cfg := ParseTelemetryConfig(params) - // Should return defaults - if cfg.Enabled { - t.Error("Expected telemetry to be disabled by default") + // Should return defaults - EnableTelemetry unset means use server flag + if cfg.EnableTelemetry.IsSet() { + t.Error("Expected EnableTelemetry to be unset (use server flag) when no params provided") } if cfg.BatchSize != 100 { @@ -63,7 +74,8 @@ func TestParseTelemetryConfig_EnabledTrue(t *testing.T) { } cfg := ParseTelemetryConfig(params) - if !cfg.EnableTelemetry { + val, ok := cfg.EnableTelemetry.Get() + if !ok || !val { t.Error("Expected EnableTelemetry to be true when set to 'true'") } } @@ -74,7 +86,8 @@ func TestParseTelemetryConfig_Enabled1(t *testing.T) { } cfg := ParseTelemetryConfig(params) - if !cfg.EnableTelemetry { + val, ok := cfg.EnableTelemetry.Get() + if !ok || !val { t.Error("Expected EnableTelemetry to be true when set to '1'") } } @@ -85,7 +98,8 @@ func TestParseTelemetryConfig_EnabledFalse(t *testing.T) { } cfg := ParseTelemetryConfig(params) - if cfg.EnableTelemetry { + val, ok := cfg.EnableTelemetry.Get() + if !ok || val { t.Error("Expected EnableTelemetry to be false when set to 'false'") } } @@ -168,7 +182,8 @@ func TestParseTelemetryConfig_MultipleParams(t *testing.T) { } cfg := ParseTelemetryConfig(params) - if !cfg.EnableTelemetry { + val, ok := cfg.EnableTelemetry.Get() + if !ok || !val { t.Error("Expected EnableTelemetry to be true") } @@ -185,3 +200,228 @@ func TestParseTelemetryConfig_MultipleParams(t *testing.T) { t.Errorf("Expected MaxRetries to remain default 3, got %d", cfg.MaxRetries) } } + +// TestIsTelemetryEnabled_ClientOverrideEnabled tests Priority 1: client explicitly enables (overrides server) +func TestIsTelemetryEnabled_ClientOverrideEnabled(t *testing.T) { + // Setup: Create a server that returns disabled + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Server says disabled, but client override should win + resp := map[string]interface{}{ + "flags": map[string]bool{ + "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver": false, + }, + } + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + cfg := &Config{ + EnableTelemetry: config.NewConfigValue(true), // Priority 1: Client explicitly enables + } + + ctx := context.Background() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Setup feature flag cache context + flagCache := getFeatureFlagCache() + flagCache.getOrCreateContext(server.URL) + defer flagCache.releaseContext(server.URL) + + // Client override should bypass server check + result := isTelemetryEnabled(ctx, cfg, server.URL, httpClient) + + if !result { + t.Error("Expected telemetry to be enabled when client explicitly sets enableTelemetry=true, got disabled") + } +} + +// TestIsTelemetryEnabled_ClientOverrideDisabled tests Priority 1: client explicitly disables (overrides server) +func TestIsTelemetryEnabled_ClientOverrideDisabled(t *testing.T) { + // Setup: Create a server that returns enabled + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Server says enabled, but client override should win + resp := map[string]interface{}{ + "flags": map[string]bool{ + "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver": true, + }, + } + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + cfg := &Config{ + EnableTelemetry: config.NewConfigValue(false), // Priority 1: Client explicitly disables + } + + ctx := context.Background() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Setup feature flag cache context + flagCache := getFeatureFlagCache() + flagCache.getOrCreateContext(server.URL) + defer flagCache.releaseContext(server.URL) + + result := isTelemetryEnabled(ctx, cfg, server.URL, httpClient) + + if result { + t.Error("Expected telemetry to be disabled when client explicitly sets enableTelemetry=false, got enabled") + } +} + +// TestIsTelemetryEnabled_ServerEnabled tests Priority 2: server flag enables (client didn't set) +func TestIsTelemetryEnabled_ServerEnabled(t *testing.T) { + // Setup: Create a server that returns enabled + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := map[string]interface{}{ + "flags": map[string]bool{ + "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver": true, + }, + } + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + cfg := &Config{ + EnableTelemetry: config.ConfigValue[bool]{}, // Client didn't set - use server flag + } + + ctx := context.Background() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Setup feature flag cache context + flagCache := getFeatureFlagCache() + flagCache.getOrCreateContext(server.URL) + defer flagCache.releaseContext(server.URL) + + result := isTelemetryEnabled(ctx, cfg, server.URL, httpClient) + + if !result { + t.Error("Expected telemetry to be enabled when server flag is true, got disabled") + } +} + +// TestIsTelemetryEnabled_ServerDisabled tests Priority 2: server flag disables (client didn't set) +func TestIsTelemetryEnabled_ServerDisabled(t *testing.T) { + // Setup: Create a server that returns disabled + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + resp := map[string]interface{}{ + "flags": map[string]bool{ + "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver": false, + }, + } + _ = json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + cfg := &Config{ + EnableTelemetry: config.ConfigValue[bool]{}, // Client didn't set - use server flag + } + + ctx := context.Background() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Setup feature flag cache context + flagCache := getFeatureFlagCache() + flagCache.getOrCreateContext(server.URL) + defer flagCache.releaseContext(server.URL) + + result := isTelemetryEnabled(ctx, cfg, server.URL, httpClient) + + if result { + t.Error("Expected telemetry to be disabled when server flag is false, got enabled") + } +} + +// TestIsTelemetryEnabled_FailSafeDefault tests Priority 3: default disabled when server unavailable +func TestIsTelemetryEnabled_FailSafeDefault(t *testing.T) { + cfg := DefaultConfig() + + ctx := context.Background() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // No server available, should default to disabled (fail-safe) + result := isTelemetryEnabled(ctx, cfg, "nonexistent-host", httpClient) + + if result { + t.Error("Expected telemetry to be disabled when server unavailable (fail-safe), got enabled") + } +} + +// TestIsTelemetryEnabled_ServerError tests Priority 3: fail-safe default on server error +func TestIsTelemetryEnabled_ServerError(t *testing.T) { + // Setup: Create a server that returns error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + cfg := &Config{ + EnableTelemetry: config.ConfigValue[bool]{}, // Client didn't set - should use server, but server errors + } + + ctx := context.Background() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Setup feature flag cache context + flagCache := getFeatureFlagCache() + flagCache.getOrCreateContext(server.URL) + defer flagCache.releaseContext(server.URL) + + result := isTelemetryEnabled(ctx, cfg, server.URL, httpClient) + + // On error, should default to disabled (fail-safe) + if result { + t.Error("Expected telemetry to be disabled on server error (fail-safe), got enabled") + } +} + +// TestIsTelemetryEnabled_ServerUnreachable tests Priority 3: fail-safe default on unreachable server +func TestIsTelemetryEnabled_ServerUnreachable(t *testing.T) { + cfg := &Config{ + EnableTelemetry: config.ConfigValue[bool]{}, // Client didn't set - should use server, but server unreachable + } + + ctx := context.Background() + httpClient := &http.Client{Timeout: 1 * time.Second} + + // Setup feature flag cache context with unreachable host + flagCache := getFeatureFlagCache() + unreachableHost := "http://localhost:9999" + flagCache.getOrCreateContext(unreachableHost) + defer flagCache.releaseContext(unreachableHost) + + result := isTelemetryEnabled(ctx, cfg, unreachableHost, httpClient) + + // On error, should default to disabled (fail-safe) + if result { + t.Error("Expected telemetry to be disabled when server unreachable (fail-safe), got enabled") + } +} + +// TestIsTelemetryEnabled_ClientOverridesServerError tests Priority 1 > Priority 3 +func TestIsTelemetryEnabled_ClientOverridesServerError(t *testing.T) { + // Setup: Create a server that returns error + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + cfg := &Config{ + EnableTelemetry: config.NewConfigValue(true), // Client explicitly enables - should override server error + } + + ctx := context.Background() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Setup feature flag cache context + flagCache := getFeatureFlagCache() + flagCache.getOrCreateContext(server.URL) + defer flagCache.releaseContext(server.URL) + + result := isTelemetryEnabled(ctx, cfg, server.URL, httpClient) + + // Client override should work even when server errors + if !result { + t.Error("Expected telemetry to be enabled when client explicitly sets true, even with server error, got disabled") + } +} diff --git a/telemetry/exporter.go b/telemetry/exporter.go new file mode 100644 index 0000000..51a83c3 --- /dev/null +++ b/telemetry/exporter.go @@ -0,0 +1,193 @@ +package telemetry + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +// telemetryExporter exports metrics to Databricks telemetry service. +type telemetryExporter struct { + host string + httpClient *http.Client + circuitBreaker *circuitBreaker + cfg *Config +} + +// telemetryMetric represents a metric to export. +type telemetryMetric struct { + metricType string + timestamp time.Time + workspaceID string + sessionID string + statementID string + latencyMs int64 + errorType string + tags map[string]interface{} +} + +// telemetryPayload is the JSON structure sent to Databricks. +type telemetryPayload struct { + Metrics []*exportedMetric `json:"metrics"` +} + +// exportedMetric is a single metric in the payload. +type exportedMetric struct { + MetricType string `json:"metric_type"` + Timestamp string `json:"timestamp"` // RFC3339 + WorkspaceID string `json:"workspace_id,omitempty"` + SessionID string `json:"session_id,omitempty"` + StatementID string `json:"statement_id,omitempty"` + LatencyMs int64 `json:"latency_ms,omitempty"` + ErrorType string `json:"error_type,omitempty"` + Tags map[string]interface{} `json:"tags,omitempty"` +} + +// newTelemetryExporter creates a new exporter. +func newTelemetryExporter(host string, httpClient *http.Client, cfg *Config) *telemetryExporter { + return &telemetryExporter{ + host: host, + httpClient: httpClient, + circuitBreaker: getCircuitBreakerManager().getCircuitBreaker(host), + cfg: cfg, + } +} + +// export exports metrics to Databricks service. +// All errors are swallowed to ensure telemetry never impacts driver operation. +func (e *telemetryExporter) export(ctx context.Context, metrics []*telemetryMetric) { + // Swallow all errors and panics + defer func() { + if r := recover(); r != nil { + // Log at trace level only + // logger.Trace().Msgf("telemetry: export panic: %v", r) + } + }() + + // Check circuit breaker + err := e.circuitBreaker.execute(ctx, func() error { + return e.doExport(ctx, metrics) + }) + + if err == ErrCircuitOpen { + // Drop metrics silently when circuit is open + return + } + + if err != nil { + // Log at trace level only + // logger.Trace().Msgf("telemetry: export error: %v", err) + } +} + +// doExport performs the actual export with retries and exponential backoff. +func (e *telemetryExporter) doExport(ctx context.Context, metrics []*telemetryMetric) error { + // Convert metrics to exported format with tag filtering + exportedMetrics := make([]*exportedMetric, 0, len(metrics)) + for _, m := range metrics { + exportedMetrics = append(exportedMetrics, m.toExportedMetric()) + } + + // Create payload + payload := &telemetryPayload{ + Metrics: exportedMetrics, + } + + // Serialize metrics + data, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal metrics: %w", err) + } + + // Determine endpoint + // Support both plain hosts and full URLs (for testing) + var endpoint string + if strings.HasPrefix(e.host, "http://") || strings.HasPrefix(e.host, "https://") { + endpoint = fmt.Sprintf("%s/api/2.0/telemetry-ext", e.host) + } else { + endpoint = fmt.Sprintf("https://%s/api/2.0/telemetry-ext", e.host) + } + + // Retry logic with exponential backoff + maxRetries := e.cfg.MaxRetries + for attempt := 0; attempt <= maxRetries; attempt++ { + // Exponential backoff (except for first attempt) + if attempt > 0 { + backoff := time.Duration(1<= 200 && resp.StatusCode < 300 { + return nil // Success + } + + // Check if retryable + if !isRetryableStatus(resp.StatusCode) { + return fmt.Errorf("non-retryable status: %d", resp.StatusCode) + } + + if attempt == maxRetries { + return fmt.Errorf("failed after %d retries: status %d", maxRetries, resp.StatusCode) + } + } + + return nil +} + +// toExportedMetric converts internal metric to exported format with tag filtering. +func (m *telemetryMetric) toExportedMetric() *exportedMetric { + // Filter tags based on export scope + filteredTags := make(map[string]interface{}) + for k, v := range m.tags { + if shouldExportToDatabricks(m.metricType, k) { + filteredTags[k] = v + } + } + + return &exportedMetric{ + MetricType: m.metricType, + Timestamp: m.timestamp.Format(time.RFC3339), + WorkspaceID: m.workspaceID, + SessionID: m.sessionID, + StatementID: m.statementID, + LatencyMs: m.latencyMs, + ErrorType: m.errorType, + Tags: filteredTags, + } +} + +// isRetryableStatus returns true if HTTP status is retryable. +// Retryable statuses: 429 (Too Many Requests), 503 (Service Unavailable), 5xx (Server Errors) +func isRetryableStatus(status int) bool { + return status == 429 || status == 503 || status >= 500 +} diff --git a/telemetry/exporter_test.go b/telemetry/exporter_test.go new file mode 100644 index 0000000..7549e04 --- /dev/null +++ b/telemetry/exporter_test.go @@ -0,0 +1,448 @@ +package telemetry + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +func TestNewTelemetryExporter(t *testing.T) { + cfg := DefaultConfig() + httpClient := &http.Client{Timeout: 5 * time.Second} + host := "test-host" + + exporter := newTelemetryExporter(host, httpClient, cfg) + + if exporter.host != host { + t.Errorf("Expected host %s, got %s", host, exporter.host) + } + + if exporter.httpClient != httpClient { + t.Error("Expected httpClient to be set") + } + + if exporter.circuitBreaker == nil { + t.Error("Expected circuitBreaker to be initialized") + } + + if exporter.cfg != cfg { + t.Error("Expected cfg to be set") + } +} + +func TestExport_Success(t *testing.T) { + requestReceived := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestReceived = true + + // Verify request method and path + if r.Method != "POST" { + t.Errorf("Expected POST, got %s", r.Method) + } + + if r.URL.Path != "/api/2.0/telemetry-ext" { + t.Errorf("Expected path /api/2.0/telemetry-ext, got %s", r.URL.Path) + } + + // Verify content type + if r.Header.Get("Content-Type") != "application/json" { + t.Errorf("Expected Content-Type application/json, got %s", r.Header.Get("Content-Type")) + } + + // Verify payload structure + body, _ := io.ReadAll(r.Body) + var payload telemetryPayload + if err := json.Unmarshal(body, &payload); err != nil { + t.Errorf("Failed to unmarshal payload: %v", err) + } + + if len(payload.Metrics) != 1 { + t.Errorf("Expected 1 metric, got %d", len(payload.Metrics)) + } + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := DefaultConfig() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + workspaceID: "test-workspace", + sessionID: "test-session", + tags: map[string]interface{}{"driver.version": "1.0.0"}, + }, + } + + ctx := context.Background() + exporter.export(ctx, metrics) + + if !requestReceived { + t.Error("Expected request to be sent to server") + } +} + +func TestExport_RetryOn5xx(t *testing.T) { + attemptCount := int32(0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count := atomic.AddInt32(&attemptCount, 1) + if count < 3 { + // Fail first 2 attempts + w.WriteHeader(http.StatusInternalServerError) + } else { + // Succeed on 3rd attempt + w.WriteHeader(http.StatusOK) + } + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.MaxRetries = 3 + cfg.RetryDelay = 10 * time.Millisecond + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + }, + } + + ctx := context.Background() + exporter.export(ctx, metrics) + + // Should have retried and succeeded + if atomic.LoadInt32(&attemptCount) != 3 { + t.Errorf("Expected 3 attempts, got %d", attemptCount) + } +} + +func TestExport_NonRetryable4xx(t *testing.T) { + attemptCount := int32(0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attemptCount, 1) + w.WriteHeader(http.StatusBadRequest) // 400 is not retryable + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.MaxRetries = 3 + cfg.RetryDelay = 10 * time.Millisecond + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + }, + } + + ctx := context.Background() + exporter.export(ctx, metrics) + + // Should only try once (no retries for 4xx) + if atomic.LoadInt32(&attemptCount) != 1 { + t.Errorf("Expected 1 attempt, got %d", attemptCount) + } +} + +func TestExport_Retry429(t *testing.T) { + attemptCount := int32(0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + count := atomic.AddInt32(&attemptCount, 1) + if count < 2 { + w.WriteHeader(http.StatusTooManyRequests) // 429 is retryable + } else { + w.WriteHeader(http.StatusOK) + } + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.MaxRetries = 3 + cfg.RetryDelay = 10 * time.Millisecond + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + }, + } + + ctx := context.Background() + exporter.export(ctx, metrics) + + // Should have retried and succeeded + if atomic.LoadInt32(&attemptCount) != 2 { + t.Errorf("Expected 2 attempts, got %d", attemptCount) + } +} + +func TestExport_CircuitBreakerOpen(t *testing.T) { + attemptCount := int32(0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attemptCount, 1) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := DefaultConfig() + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + // Open the circuit breaker by recording failures + cb := exporter.circuitBreaker + ctx := context.Background() + + // Record enough failures to open circuit (50% failure rate with 20+ calls) + for i := 0; i < 25; i++ { + cb.recordCall(callFailure) + } + + // Verify circuit is open + if cb.getState() != stateOpen { + t.Error("Expected circuit to be open") + } + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + }, + } + + // Export should be dropped due to open circuit + exporter.export(ctx, metrics) + + // No request should have been made + if atomic.LoadInt32(&attemptCount) != 0 { + t.Errorf("Expected 0 attempts with open circuit, got %d", attemptCount) + } +} + +func TestToExportedMetric_TagFiltering(t *testing.T) { + metric := &telemetryMetric{ + metricType: "connection", + timestamp: time.Date(2026, 1, 30, 10, 0, 0, 0, time.UTC), + workspaceID: "test-workspace", + sessionID: "test-session", + statementID: "test-statement", + latencyMs: 100, + errorType: "test-error", + tags: map[string]interface{}{ + "workspace.id": "ws-123", // Should be exported + "driver.version": "1.0.0", // Should be exported + "server.address": "localhost:8080", // Should NOT be exported (local only) + "unknown.tag": "value", // Should NOT be exported + }, + } + + exported := metric.toExportedMetric() + + // Verify basic fields + if exported.MetricType != "connection" { + t.Errorf("Expected MetricType 'connection', got %s", exported.MetricType) + } + + if exported.WorkspaceID != "test-workspace" { + t.Errorf("Expected WorkspaceID 'test-workspace', got %s", exported.WorkspaceID) + } + + // Verify timestamp format + if exported.Timestamp != "2026-01-30T10:00:00Z" { + t.Errorf("Expected timestamp '2026-01-30T10:00:00Z', got %s", exported.Timestamp) + } + + // Verify tag filtering + if _, ok := exported.Tags["workspace.id"]; !ok { + t.Error("Expected 'workspace.id' tag to be exported") + } + + if _, ok := exported.Tags["driver.version"]; !ok { + t.Error("Expected 'driver.version' tag to be exported") + } + + if _, ok := exported.Tags["server.address"]; ok { + t.Error("Expected 'server.address' tag to NOT be exported (local only)") + } + + if _, ok := exported.Tags["unknown.tag"]; ok { + t.Error("Expected 'unknown.tag' to NOT be exported") + } +} + +func TestIsRetryableStatus(t *testing.T) { + tests := []struct { + status int + retryable bool + description string + }{ + {200, false, "200 OK is not retryable"}, + {201, false, "201 Created is not retryable"}, + {400, false, "400 Bad Request is not retryable"}, + {401, false, "401 Unauthorized is not retryable"}, + {403, false, "403 Forbidden is not retryable"}, + {404, false, "404 Not Found is not retryable"}, + {429, true, "429 Too Many Requests is retryable"}, + {500, true, "500 Internal Server Error is retryable"}, + {502, true, "502 Bad Gateway is retryable"}, + {503, true, "503 Service Unavailable is retryable"}, + {504, true, "504 Gateway Timeout is retryable"}, + } + + for _, tt := range tests { + result := isRetryableStatus(tt.status) + if result != tt.retryable { + t.Errorf("%s: expected %v, got %v", tt.description, tt.retryable, result) + } + } +} + +func TestExport_ErrorSwallowing(t *testing.T) { + // Server that always fails + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.MaxRetries = 1 + cfg.RetryDelay = 10 * time.Millisecond + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + }, + } + + ctx := context.Background() + + // This should not panic even though all requests fail + defer func() { + if r := recover(); r != nil { + t.Errorf("Export panicked: %v", r) + } + }() + + exporter.export(ctx, metrics) + // If we get here without panic, error swallowing works +} + +func TestExport_ContextCancellation(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Slow server + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.MaxRetries = 3 + cfg.RetryDelay = 50 * time.Millisecond + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + }, + } + + // Create context that will be cancelled + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + + // Export with cancelled context (should not panic) + exporter.export(ctx, metrics) + // If we get here, context cancellation is handled properly +} + +func TestExport_ExponentialBackoff(t *testing.T) { + attemptTimes := make([]time.Time, 0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attemptTimes = append(attemptTimes, time.Now()) + // Always fail to test all retries + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.MaxRetries = 3 + cfg.RetryDelay = 50 * time.Millisecond + httpClient := &http.Client{Timeout: 5 * time.Second} + + // Use full server URL for testing + exporter := newTelemetryExporter(server.URL, httpClient, cfg) + + metrics := []*telemetryMetric{ + { + metricType: "connection", + timestamp: time.Now(), + }, + } + + ctx := context.Background() + exporter.export(ctx, metrics) + + // Should have 4 attempts (1 initial + 3 retries) + if len(attemptTimes) != 4 { + t.Errorf("Expected 4 attempts, got %d", len(attemptTimes)) + return + } + + // Verify exponential backoff delays + // Attempt 0: immediate + // Attempt 1: +50ms (2^0 * 50ms) + // Attempt 2: +100ms (2^1 * 50ms) + // Attempt 3: +200ms (2^2 * 50ms) + + delay1 := attemptTimes[1].Sub(attemptTimes[0]) + delay2 := attemptTimes[2].Sub(attemptTimes[1]) + delay3 := attemptTimes[3].Sub(attemptTimes[2]) + + // Allow 30ms tolerance for timing variations + tolerance := 30 * time.Millisecond + + if delay1 < (50*time.Millisecond-tolerance) || delay1 > (50*time.Millisecond+tolerance) { + t.Errorf("Expected delay1 ~50ms, got %v", delay1) + } + + if delay2 < (100*time.Millisecond-tolerance) || delay2 > (100*time.Millisecond+tolerance) { + t.Errorf("Expected delay2 ~100ms, got %v", delay2) + } + + if delay3 < (200*time.Millisecond-tolerance) || delay3 > (200*time.Millisecond+tolerance) { + t.Errorf("Expected delay3 ~200ms, got %v", delay3) + } +} diff --git a/telemetry/featureflag.go b/telemetry/featureflag.go index 826bfa2..24e42d3 100644 --- a/telemetry/featureflag.go +++ b/telemetry/featureflag.go @@ -16,6 +16,12 @@ const ( featureFlagCacheDuration = 15 * time.Minute // featureFlagHTTPTimeout is the default timeout for feature flag HTTP requests featureFlagHTTPTimeout = 10 * time.Second + + // Feature flag names + // flagEnableTelemetry controls whether telemetry is enabled for the Go driver + flagEnableTelemetry = "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver" + // Add more feature flags here as needed: + // flagEnableNewFeature = "databricks.partnerplatform.clientConfigsFeatureFlags.enableNewFeatureForGoDriver" ) // featureFlagCache manages feature flag state per host with reference counting. @@ -27,12 +33,12 @@ type featureFlagCache struct { // featureFlagContext holds feature flag state and reference count for a host. type featureFlagContext struct { - mu sync.RWMutex // protects enabled, lastFetched, fetching - enabled *bool - lastFetched time.Time - refCount int // protected by featureFlagCache.mu - cacheDuration time.Duration - fetching bool // true if a fetch is in progress + mu sync.RWMutex // protects flags, lastFetched, fetching + flags map[string]bool // cached feature flags by name + lastFetched time.Time // when flags were last fetched + refCount int // protected by featureFlagCache.mu + cacheDuration time.Duration // how long to cache flags + fetching bool // true if a fetch is in progress } var ( @@ -81,9 +87,10 @@ func (c *featureFlagCache) releaseContext(host string) { } } -// isTelemetryEnabled checks if telemetry is enabled for the host. +// getFeatureFlag retrieves a specific feature flag value for the host. +// This is the generic method that handles caching and fetching for any flag. // Uses cached value if available and not expired. -func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, httpClient *http.Client) (bool, error) { +func (c *featureFlagCache) getFeatureFlag(ctx context.Context, host string, httpClient *http.Client, flagName string) (bool, error) { c.mu.RLock() flagCtx, exists := c.contexts[host] c.mu.RUnlock() @@ -94,8 +101,9 @@ func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, // Check if cache is valid (with proper locking) flagCtx.mu.RLock() - if flagCtx.enabled != nil && time.Since(flagCtx.lastFetched) < flagCtx.cacheDuration { - enabled := *flagCtx.enabled + if flagCtx.flags != nil && time.Since(flagCtx.lastFetched) < flagCtx.cacheDuration { + // Cache is valid, return the cached flag value + enabled := flagCtx.flags[flagName] // returns false if flag not found flagCtx.mu.RUnlock() return enabled, nil } @@ -103,8 +111,8 @@ func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, // Check if another goroutine is already fetching if flagCtx.fetching { // Return cached value if available, otherwise wait - if flagCtx.enabled != nil { - enabled := *flagCtx.enabled + if flagCtx.flags != nil { + enabled := flagCtx.flags[flagName] flagCtx.mu.RUnlock() return enabled, nil } @@ -117,41 +125,58 @@ func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, flagCtx.fetching = true flagCtx.mu.RUnlock() - // Fetch fresh value - enabled, err := fetchFeatureFlag(ctx, host, httpClient) + // Fetch fresh values for all flags + flags, err := fetchFeatureFlags(ctx, host, httpClient) // Update cache (with proper locking) flagCtx.mu.Lock() flagCtx.fetching = false if err == nil { - flagCtx.enabled = &enabled + flagCtx.flags = flags flagCtx.lastFetched = time.Now() } - // On error, keep the old cached value if it exists + // On error, keep the old cached values if they exist result := false var returnErr error if err != nil { - if flagCtx.enabled != nil { - result = *flagCtx.enabled + if flagCtx.flags != nil { + result = flagCtx.flags[flagName] returnErr = nil // Return cached value without error } else { returnErr = err } } else { - result = enabled + result = flags[flagName] } flagCtx.mu.Unlock() return result, returnErr } +// isTelemetryEnabled checks if telemetry is enabled for the host. +// Uses cached value if available and not expired. +func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, httpClient *http.Client) (bool, error) { + return c.getFeatureFlag(ctx, host, httpClient, flagEnableTelemetry) +} + // isExpired returns true if the cache has expired. func (c *featureFlagContext) isExpired() bool { - return c.enabled == nil || time.Since(c.lastFetched) > c.cacheDuration + return c.flags == nil || time.Since(c.lastFetched) > c.cacheDuration +} + +// getAllFeatureFlags returns a list of all feature flags to fetch. +// Add new flags here when adding new features. +func getAllFeatureFlags() []string { + return []string{ + flagEnableTelemetry, + // Add more flags here as needed: + // flagEnableNewFeature, + } } -// fetchFeatureFlag fetches the feature flag value from Databricks. -func fetchFeatureFlag(ctx context.Context, host string, httpClient *http.Client) (bool, error) { +// fetchFeatureFlags fetches multiple feature flag values from Databricks in a single request. +// Returns a map of flag names to their boolean values. +func fetchFeatureFlags(ctx context.Context, host string, httpClient *http.Client) (map[string]bool, error) { // Add timeout to context if it doesn't have a deadline if _, hasDeadline := ctx.Deadline(); !hasDeadline { var cancel context.CancelFunc @@ -169,37 +194,40 @@ func fetchFeatureFlag(ctx context.Context, host string, httpClient *http.Client) req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) if err != nil { - return false, fmt.Errorf("failed to create feature flag request: %w", err) + return nil, fmt.Errorf("failed to create feature flag request: %w", err) } - // Add query parameter for the specific feature flag + // Add query parameter with comma-separated list of feature flags + // This fetches all flags in a single request for efficiency + allFlags := getAllFeatureFlags() q := req.URL.Query() - q.Add("flags", "databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver") + q.Add("flags", strings.Join(allFlags, ",")) req.URL.RawQuery = q.Encode() resp, err := httpClient.Do(req) if err != nil { - return false, fmt.Errorf("failed to fetch feature flag: %w", err) + return nil, fmt.Errorf("failed to fetch feature flags: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { // Read and discard body to allow HTTP connection reuse _, _ = io.Copy(io.Discard, resp.Body) - return false, fmt.Errorf("feature flag check failed: %d", resp.StatusCode) + return nil, fmt.Errorf("feature flag check failed: %d", resp.StatusCode) } var result struct { Flags map[string]bool `json:"flags"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return false, fmt.Errorf("failed to decode feature flag response: %w", err) + return nil, fmt.Errorf("failed to decode feature flag response: %w", err) } - enabled, ok := result.Flags["databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver"] - if !ok { - return false, nil + // Return the full map of flags + // Flags not present in the response will have false value when accessed + if result.Flags == nil { + return make(map[string]bool), nil } - return enabled, nil + return result.Flags, nil } diff --git a/telemetry/featureflag_test.go b/telemetry/featureflag_test.go index b45fc8f..b9f5f92 100644 --- a/telemetry/featureflag_test.go +++ b/telemetry/featureflag_test.go @@ -94,8 +94,9 @@ func TestFeatureFlagCache_IsTelemetryEnabled_Cached(t *testing.T) { ctx := cache.getOrCreateContext(host) // Set cached value - enabled := true - ctx.enabled = &enabled + ctx.flags = map[string]bool{ + flagEnableTelemetry: true, + } ctx.lastFetched = time.Now() // Should return cached value without HTTP call @@ -127,8 +128,9 @@ func TestFeatureFlagCache_IsTelemetryEnabled_Expired(t *testing.T) { ctx := cache.getOrCreateContext(host) // Set expired cached value - enabled := false - ctx.enabled = &enabled + ctx.flags = map[string]bool{ + flagEnableTelemetry: false, + } ctx.lastFetched = time.Now().Add(-20 * time.Minute) // Expired // Should fetch fresh value @@ -145,7 +147,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_Expired(t *testing.T) { } // Verify cache was updated - if *ctx.enabled != true { + if ctx.flags[flagEnableTelemetry] != true { t.Error("Expected cache to be updated with new value") } } @@ -182,8 +184,9 @@ func TestFeatureFlagCache_IsTelemetryEnabled_ErrorFallback(t *testing.T) { ctx := cache.getOrCreateContext(host) // Set cached value - enabled := true - ctx.enabled = &enabled + ctx.flags = map[string]bool{ + flagEnableTelemetry: true, + } ctx.lastFetched = time.Now().Add(-20 * time.Minute) // Expired // Should return cached value on error @@ -271,28 +274,28 @@ func TestFeatureFlagCache_ConcurrentAccess(t *testing.T) { func TestFeatureFlagContext_IsExpired(t *testing.T) { tests := []struct { name string - enabled *bool + flags map[string]bool fetched time.Time duration time.Duration want bool }{ { name: "no cache", - enabled: nil, + flags: nil, fetched: time.Time{}, duration: 15 * time.Minute, want: true, }, { name: "fresh cache", - enabled: boolPtr(true), + flags: map[string]bool{flagEnableTelemetry: true}, fetched: time.Now(), duration: 15 * time.Minute, want: false, }, { name: "expired cache", - enabled: boolPtr(true), + flags: map[string]bool{flagEnableTelemetry: true}, fetched: time.Now().Add(-20 * time.Minute), duration: 15 * time.Minute, want: true, @@ -302,7 +305,7 @@ func TestFeatureFlagContext_IsExpired(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := &featureFlagContext{ - enabled: tt.enabled, + flags: tt.flags, lastFetched: tt.fetched, cacheDuration: tt.duration, } @@ -313,7 +316,7 @@ func TestFeatureFlagContext_IsExpired(t *testing.T) { } } -func TestFetchFeatureFlag_Success(t *testing.T) { +func TestFetchFeatureFlags_Success(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Verify request if r.Method != "GET" { @@ -339,16 +342,16 @@ func TestFetchFeatureFlag_Success(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, httpClient) + flags, err := fetchFeatureFlags(context.Background(), host, httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } - if !enabled { - t.Error("Expected feature flag to be enabled") + if !flags[flagEnableTelemetry] { + t.Error("Expected telemetry feature flag to be enabled") } } -func TestFetchFeatureFlag_Disabled(t *testing.T) { +func TestFetchFeatureFlags_Disabled(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -359,16 +362,16 @@ func TestFetchFeatureFlag_Disabled(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, httpClient) + flags, err := fetchFeatureFlags(context.Background(), host, httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } - if enabled { - t.Error("Expected feature flag to be disabled") + if flags[flagEnableTelemetry] { + t.Error("Expected telemetry feature flag to be disabled") } } -func TestFetchFeatureFlag_FlagNotPresent(t *testing.T) { +func TestFetchFeatureFlags_FlagNotPresent(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -379,16 +382,16 @@ func TestFetchFeatureFlag_FlagNotPresent(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, httpClient) + flags, err := fetchFeatureFlags(context.Background(), host, httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } - if enabled { - t.Error("Expected feature flag to be false when not present") + if flags[flagEnableTelemetry] { + t.Error("Expected telemetry feature flag to be false when not present") } } -func TestFetchFeatureFlag_HTTPError(t *testing.T) { +func TestFetchFeatureFlags_HTTPError(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) })) @@ -397,13 +400,13 @@ func TestFetchFeatureFlag_HTTPError(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - _, err := fetchFeatureFlag(context.Background(), host, httpClient) + _, err := fetchFeatureFlags(context.Background(), host, httpClient) if err == nil { t.Error("Expected error for HTTP 500") } } -func TestFetchFeatureFlag_InvalidJSON(t *testing.T) { +func TestFetchFeatureFlags_InvalidJSON(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -414,13 +417,13 @@ func TestFetchFeatureFlag_InvalidJSON(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - _, err := fetchFeatureFlag(context.Background(), host, httpClient) + _, err := fetchFeatureFlags(context.Background(), host, httpClient) if err == nil { t.Error("Expected error for invalid JSON") } } -func TestFetchFeatureFlag_ContextCancellation(t *testing.T) { +func TestFetchFeatureFlags_ContextCancellation(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(100 * time.Millisecond) w.WriteHeader(http.StatusOK) @@ -433,13 +436,8 @@ func TestFetchFeatureFlag_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately - _, err := fetchFeatureFlag(ctx, host, httpClient) + _, err := fetchFeatureFlags(ctx, host, httpClient) if err == nil { t.Error("Expected error for cancelled context") } } - -// Helper function to create bool pointer -func boolPtr(b bool) *bool { - return &b -}