The client package provides multiple implementations of the OutputClient interface for sending logs from Fluent Bit to various backends. It supports OpenTelemetry Protocol (OTLP) over gRPC and HTTP, as well as stdout and no-op clients for testing and debugging.
The client package abstracts the complexity of sending logs to different backends. All clients implement the OutputClient interface, which provides a consistent API regardless of the underlying transport mechanism.
The package supports:
- Multiple protocols: OTLP over gRPC and HTTP
- Persistent buffering: Using dque (disk-based queue) for reliability
- Batch processing: Efficient log batching with configurable limits
- Retry logic: Configurable exponential backoff for failed exports
- Rate limiting: Optional throttling to prevent overwhelming backends
- TLS/mTLS: Full TLS configuration support
- Metrics: Prometheus metrics for monitoring client behavior
- Target-based routing: Separate configurations for Seed and Shoot clusters
The OTLP gRPC client (OTLPGRPCClient) sends logs using the OpenTelemetry Protocol over gRPC. This is the recommended production client for high-throughput, low-latency log shipping.
Features:
- Bi-directional streaming support
- Efficient binary protocol (Protobuf)
- Built-in compression (gzip)
- Connection multiplexing
- Persistent buffering with dque
- Configurable batch processing
- Retry with exponential backoff
- Optional rate limiting
- TLS/mTLS support
- gRPC metrics instrumentation
Use cases:
- Production environments
- High-volume log shipping
- Low-latency requirements
- When backend supports gRPC
Configuration type: otlp_grpc (string) or types.OTLPGRPC (enum)
The OTLP HTTP client (OTLPHTTPClient) sends logs using the OpenTelemetry Protocol over HTTP/1.1 or HTTP/2.
Features:
- Standard HTTP protocol
- JSON or Protobuf encoding
- Compression support (gzip)
- Persistent buffering with dque
- Configurable batch processing
- Retry with exponential backoff
- Optional rate limiting
- TLS support
- Works through HTTP proxies
Use cases:
- When gRPC is not available or blocked by firewalls
- HTTP proxy environments
- Debugging (easier to inspect with standard tools)
- When backend only supports HTTP
Configuration type: otlp_http (string) or types.OTLPHTTP (enum)
The Stdout client (StdoutClient) writes all log entries to standard output in JSON format.
Features:
- Simple JSON output
- No external dependencies
- Minimal overhead
- Useful for debugging
- Metrics tracking
Use cases:
- Development and debugging
- Testing log processing pipeline
- Integration with stdout-based log collectors
- Troubleshooting without backend connectivity
Configuration type: stdout (string) or types.STDOUT (enum)
Output format:
{
"timestamp": "2025-12-22T10:30:45.123456Z",
"record": {
"message": "Application log message",
"level": "info",
"kubernetes": {...}
}
}The Noop client (NoopClient) discards all log entries without processing them.
Features:
- Zero overhead
- Discards all logs
- Increments dropped logs metrics
- Useful for testing
Use cases:
- Performance testing (measure overhead without I/O)
- Disabling log output temporarily
- Testing metrics collection
- Benchmarking
Configuration type: noop (string) or types.NOOP (enum)
The client package supports two target types that determine which backend configuration to use:
The Seed target (client.Seed) is used for logs originating from the Gardener Seed cluster. The client uses the SeedType configuration from PluginConfig to determine which client implementation to create.
Usage:
client, err := client.NewClient(ctx, cfg, client.WithTarget(client.Seed))The Shoot target (client.Shoot) is used for logs originating from the Gardener Shoot clusters. The client uses the ShootType configuration from PluginConfig to determine which client implementation to create.
Usage:
client, err := client.NewClient(ctx, cfg, client.WithTarget(client.Shoot))Configuration is managed through the config.Config struct, which contains both plugin-level and OTLP-specific settings.
The OTLPConfig struct holds all OTLP-related configuration:
type OTLPConfig struct {
Endpoint string // Backend endpoint (e.g., "localhost:4317")
Insecure bool // Skip TLS verification (not recommended for production)
Compression int // Compression level (0 = none, 1 = gzip)
Timeout time.Duration // Request timeout
Headers map[string]string // Custom HTTP/gRPC headers (e.g., authentication)
// Embedded configurations
DQueConfig // Persistent queue settings
// Batch processor settings
DQueBatchProcessorMaxQueueSize int
DQueBatchProcessorMaxBatchSize int
DQueBatchProcessorExportTimeout time.Duration
DQueBatchProcessorExportInterval time.Duration
DQueBatchProcessorExportBufferSize int
// Retry settings
RetryEnabled bool
RetryInitialInterval time.Duration
RetryMaxInterval time.Duration
RetryMaxElapsedTime time.Duration
// Throttle settings
ThrottleEnabled bool
ThrottleRequestsPerSec int
// TLS settings
TLSCertFile string
TLSKeyFile string
TLSCAFile string
TLSServerName string
TLSInsecureSkipVerify bool
TLSMinVersion string
TLSMaxVersion string
}Default values:
Endpoint: "localhost:4317"
Insecure: false
Compression: 0 // No compression
Timeout: 30 * time.Second
RetryEnabled: true
RetryInitialInterval: 5 * time.Second
RetryMaxInterval: 30 * time.Second
RetryMaxElapsedTime: 1 * time.Minute
ThrottleEnabled: false
ThrottleRequestsPerSec: 0 // No limit
DQueBatchProcessorMaxQueueSize: 512
DQueBatchProcessorMaxBatchSize: 256
DQueBatchProcessorExportTimeout: 30 * time.Second
DQueBatchProcessorExportInterval: 1 * time.Second
TLSMinVersion: "1.2"The DQueConfig struct configures the persistent disk-based queue:
type DQueConfig struct {
DQueDir string // Directory for queue persistence
DQueSegmentSize int // Number of items per segment file
DQueSync bool // Synchronous writes (slower but safer)
DQueName string // Queue name (for multiple queues)
}Default values:
DQueDir: "/tmp/flb-storage"
DQueSegmentSize: 500
DQueSync: false
DQueName: "dque"Considerations:
- DQueDir: Ensure sufficient disk space and proper permissions
- DQueSegmentSize: Larger values = fewer files, smaller values = faster recovery
- DQueSync: Enable for critical logs, disable for performance
- DQueName: Use unique names when running multiple instances
The batch processor groups logs into batches before sending to reduce overhead:
| Parameter | Description | Default | Tuning |
|---|---|---|---|
DQueBatchProcessorMaxQueueSize |
Maximum records in memory queue before dropping | 512 | Increase for high throughput, decrease to prevent OOM |
DQueBatchProcessorMaxBatchSize |
Maximum records per export batch | 256 | Increase for efficiency, decrease for lower latency |
DQueBatchProcessorExportTimeout |
Timeout for single export operation | 30s | Increase for slow backends |
DQueBatchProcessorExportInterval |
Time between periodic exports | 1s | Decrease for lower latency, increase for efficiency |
Tuning guidelines:
- High throughput: Increase
MaxBatchSizeandExportInterval - Low latency: Decrease
ExportIntervalandMaxBatchSize - Memory constrained: Decrease
MaxQueueSize - Slow backend: Increase
ExportTimeout
TLS is configured through the OTLPConfig fields:
cfg.OTLPConfig.TLSCertFile = "/path/to/client-cert.pem" // Client certificate (for mTLS)
cfg.OTLPConfig.TLSKeyFile = "/path/to/client-key.pem" // Client private key (for mTLS)
cfg.OTLPConfig.TLSCAFile = "/path/to/ca-cert.pem" // CA certificate for server verification
cfg.OTLPConfig.TLSServerName = "example.com" // Server name for SNI
cfg.OTLPConfig.TLSInsecureSkipVerify = false // Don't skip verification (recommended)
cfg.OTLPConfig.TLSMinVersion = "1.2" // Minimum TLS version
cfg.OTLPConfig.TLSMaxVersion = "1.3" // Maximum TLS version (optional)Security best practices:
- Always use TLS in production
- Never set
InsecureorTLSInsecureSkipVerifytotruein production - Use TLS 1.2 or higher
- Implement mTLS for enhanced security
- Keep certificates rotated and up-to-date
Retry configuration uses exponential backoff:
cfg.OTLPConfig.RetryEnabled = true
cfg.OTLPConfig.RetryInitialInterval = 5 * time.Second // First retry after 5s
cfg.OTLPConfig.RetryMaxInterval = 30 * time.Second // Max wait between retries
cfg.OTLPConfig.RetryMaxElapsedTime = 1 * time.Minute // Give up after 1 minuteRetry sequence example:
- Initial request fails
- Wait 5s, retry
- Wait 10s, retry (doubled)
- Wait 20s, retry (doubled)
- Wait 30s, retry (capped at max)
- Continue until 1 minute elapsed, then give up
Rate limiting prevents overwhelming the backend:
cfg.OTLPConfig.ThrottleEnabled = true
cfg.OTLPConfig.ThrottleRequestsPerSec = 100 // Max 100 requests/secondBehavior:
- When enabled, client limits requests to specified rate
- Excess requests return
ErrThrottlederror - Use
DroppedLogsmetrics to monitor throttled records - Set
ThrottleRequestsPerSec = 0for unlimited (whenThrottleEnabled = false)
Use the NewClient function with functional options:
import (
"context"
"github.com/gardener/logging/v1/pkg/client"
"github.com/gardener/logging/v1/pkg/config"
"github.com/go-logr/logr"
)
// Load configuration
cfg := config.Config{
PluginConfig: config.PluginConfig{
ShootType: "otlp_grpc", // Client type for shoot clusters
SeedType: "otlp_grpc", // Client type for seed cluster
},
OTLPConfig: config.OTLPConfig{
Endpoint: "otlp-collector.example.com:4317",
Timeout: 30 * time.Second,
},
}
// Create logger (use your preferred logging library)
logger := logr.Discard() // Replace with actual logger
// Create client for shoot target
ctx := context.Background()
shootClient, err := client.NewClient(
ctx,
cfg,
client.WithTarget(client.Shoot),
client.WithLogger(logger),
)
if err != nil {
// Handle error
}
defer shootClient.StopWait()The NewClient function accepts functional options:
Specifies whether to use Seed or Shoot configuration:
client.NewClient(ctx, cfg, client.WithTarget(client.Shoot))
client.NewClient(ctx, cfg, client.WithTarget(client.Seed))Provides a logger for client operations:
logger := logr.New(handler) // Your logger implementation
client.NewClient(ctx, cfg, client.WithLogger(logger))If no logger is provided, a no-op logger is used.
Once created, use the Handle method to send logs:
entry := types.OutputEntry{
Timestamp: time.Now(),
Record: map[string]any{
"message": "Application started",
"level": "info",
"kubernetes": map[string]any{
"namespace": "default",
"pod": "app-123",
},
},
}
err := shootClient.Handle(entry)
if err != nil {
// Handle error (e.g., throttled, queue full, network error)
}Error handling:
client.ErrThrottled: Client is rate-limitedclient.ErrQueueFull: Internal queue is fullclient.ErrProcessorClosed: Client has been shut down- Network errors: Backend unreachable or request failed
Proper shutdown ensures all buffered logs are sent:
// Stop accepting new logs and wait for queue to drain
shootClient.StopWait()This method:
- Stops accepting new logs
- Flushes all buffered logs to backend
- Waits for in-flight exports to complete
- Closes connections
// Stop immediately without waiting
shootClient.Stop()This method:
- Stops accepting new logs immediately
- Cancels in-flight operations
- May lose buffered logs
- Use only in emergency or testing
Best practice:
defer shootClient.StopWait() // Ensure graceful shutdown on exitThe DQue Batch Processor is the core component for reliable log delivery:
┌─────────────────────────────────────────────────────────────────┐
│ Fluent Bit Output Plugin │
└────────────────────────────────┬────────────────────────────────┘
│ Handle(entry)
▼
┌─────────────────────────────────────────────────────────────────┐
│ OutputClient │
│ (OTLPGRPCClient / OTLPHTTPClient / StdoutClient / NoopClient) │
└────────────────────────────────┬────────────────────────────────┘
│ OnEmit(record)
▼
┌─────────────────────────────────────────────────────────────────┐
│ DQue Batch Processor │
│ │
│ ┌────────────────────┐ ┌──────────────────────┐ │
│ │ Memory Queue │────────▶│ DQue (Disk Queue) │ │
│ │ (Circular Buffer) │ │ (Persistent Storage)│ │
│ └────────────────────┘ └──────────────────────┘ │
│ │ │ │
│ │ Batch (every 1s or 256 logs) │ │
│ ▼ │ │
│ ┌────────────────────┐ │ │
│ │ Export Worker │◀─────────────────┘ │
│ │ (Goroutine) │ │
│ └────────┬───────────┘ │
└───────────┼──────────────────────────────────────────────────────┘
│ Export(batch)
▼
┌─────────────────────────────────────────────────────────────────┐
│ OTLP Exporter │
│ (gRPC or HTTP with retry logic) │
└────────────────────────────────┬────────────────────────────────┘
│
▼
Backend (e.g., Vali)
Key features:
- Memory Queue: Fast in-memory circular buffer for incoming logs
- Persistent Storage: DQue writes logs to disk for durability
- Batch Processing: Groups logs into efficient batches
- Export Worker: Background goroutine handles exports
- Retry Logic: Automatic retry with exponential backoff
- Metrics: Comprehensive metrics for monitoring
The client package exports Prometheus metrics for monitoring:
| Metric | Type | Labels | Description |
|---|---|---|---|
output_client_logs_total |
Counter | endpoint |
Total logs sent by client |
dropped_logs_total |
Counter | endpoint, reason |
Logs dropped (queue full, throttled, etc.) |
errors_total |
Counter | type |
Errors by type |
| Metric | Type | Labels | Description |
|---|---|---|---|
dque_queue_size |
Gauge | endpoint |
Current queue size |
dque_batch_size |
Histogram | endpoint |
Size of exported batches |
dque_export_duration_seconds |
Histogram | endpoint, status |
Export operation duration |
dque_exports_total |
Counter | endpoint, status |
Total exports by status |
| Metric | Type | Labels | Description |
|---|---|---|---|
grpc_client_started_total |
Counter | grpc_method, grpc_service |
RPCs started |
grpc_client_handled_total |
Counter | grpc_method, grpc_service, grpc_code |
RPCs completed |
grpc_client_msg_sent_total |
Counter | grpc_method, grpc_service |
Messages sent |
grpc_client_msg_received_total |
Counter | grpc_method, grpc_service |
Messages received |
Monitoring recommendations:
- Alert on high
dropped_logs_totalrates - Monitor
dque_queue_sizefor queue buildup - Track
dque_export_duration_secondsfor backend latency - Watch
errors_totalfor issues
package main
import (
"context"
"time"
"github.com/gardener/logging/v1/pkg/client"
"github.com/gardener/logging/v1/pkg/config"
"github.com/gardener/logging/v1/pkg/types"
"github.com/go-logr/logr"
)
func main() {
cfg := config.Config{
PluginConfig: config.PluginConfig{
ShootType: "otlp_grpc",
},
OTLPConfig: config.OTLPConfig{
Endpoint: "localhost:4317",
Timeout: 30 * time.Second,
},
}
ctx := context.Background()
logger := logr.Discard()
c, err := client.NewClient(ctx, cfg,
client.WithTarget(client.Shoot),
client.WithLogger(logger),
)
if err != nil {
panic(err)
}
defer c.StopWait()
// Send a log
entry := types.OutputEntry{
Timestamp: time.Now(),
Record: map[string]any{
"message": "Hello, World!",
"level": "info",
},
}
if err := c.Handle(entry); err != nil {
// Handle error
}
}cfg := config.Config{
PluginConfig: config.PluginConfig{
SeedType: "otlp_http",
},
OTLPConfig: config.OTLPConfig{
Endpoint: "https://otlp-collector.example.com:4318",
TLSCAFile: "/etc/ssl/certs/ca.pem",
TLSCertFile: "/etc/ssl/certs/client.pem",
TLSKeyFile: "/etc/ssl/private/client-key.pem",
TLSMinVersion: "1.3",
},
}
c, err := client.NewClient(ctx, cfg,
client.WithTarget(client.Seed),
client.WithLogger(logger),
)cfg := config.Config{
PluginConfig: config.PluginConfig{
ShootType: "STDOUT",
},
}
c, err := client.NewClient(ctx, cfg,
client.WithTarget(client.Shoot),
)
// Logs will be written to stdout in JSON formatcfg := config.Config{
PluginConfig: config.PluginConfig{
ShootType: "otlp_grpc",
},
OTLPConfig: config.OTLPConfig{
Endpoint: "otlp-collector:4317",
Compression: 1, // Enable gzip
// Larger batches for efficiency
DQueBatchProcessorMaxQueueSize: 2048,
DQueBatchProcessorMaxBatchSize: 512,
DQueBatchProcessorExportInterval: 5 * time.Second,
// DQue settings
DQueConfig: config.DQueConfig{
DQueDir: "/var/log/fluent-bit-storage",
DQueSegmentSize: 1000,
DQueSync: false, // Async for performance
},
// Enable retry
RetryEnabled: true,
RetryInitialInterval: 5 * time.Second,
RetryMaxInterval: 30 * time.Second,
RetryMaxElapsedTime: 5 * time.Minute,
},
}cfg := config.Config{
PluginConfig: config.PluginConfig{
ShootType: "otlp_grpc",
},
OTLPConfig: config.OTLPConfig{
Endpoint: "otlp-collector:4317",
// Enable throttling
ThrottleEnabled: true,
ThrottleRequestsPerSec: 100, // Max 100 requests/sec
},
}
c, err := client.NewClient(ctx, cfg,
client.WithTarget(client.Shoot),
)
// Handle throttling
if err := c.Handle(entry); err != nil {
if errors.Is(err, client.ErrThrottled) {
// Log was throttled - consider buffering or dropping
}
}When adding new client types or modifying existing ones:
- Implement the
OutputClientinterface - Add appropriate metrics
- Write unit tests using Ginkgo and Gomega
- Update this documentation
- Follow coding standards and best practices