diff --git a/internal/disk/scheduler.go b/internal/disk/scheduler.go index cad4e32..651a96b 100644 --- a/internal/disk/scheduler.go +++ b/internal/disk/scheduler.go @@ -130,12 +130,16 @@ func (s *Scheduler) runWorker(ctx context.Context, workerID int, fileCh <-chan d return } - w := newWorker(workerID, s.workerLog, file, s.diskCfg.ShiftTimestamp, s.getNewWorkerSpeed(), s.blockPool, s.insertQueue, s.metrics, s.passthrough, s.replayTimeKeeper) + w, err := newWorker(workerID, s.workerLog, file, s.diskCfg.ShiftTimestamp, s.getNewWorkerSpeed(), s.blockPool, s.insertQueue, s.metrics, s.passthrough, s.replayTimeKeeper) + if err != nil && !errors.Is(err, context.Canceled) { + s.log.Warn("newWorker failed, skipping file", "worker_id", workerID, "file", file.Path, "err", err) + continue + } s.register(workerID, w) s.rebalanceSpeed() - err := w.Run(ctx) + err = w.Run(ctx) s.unregister(workerID) s.rebalanceSpeed() diff --git a/internal/disk/worker.go b/internal/disk/worker.go index 3ca2b3a..321276d 100644 --- a/internal/disk/worker.go +++ b/internal/disk/worker.go @@ -41,6 +41,8 @@ type worker struct { blockPool block.Pool insertQueue chan<- block.SharedColumns + cleanup func() + protoReader *proto.Reader } func newWorker( @@ -54,8 +56,9 @@ func newWorker( metrics metrics.Store, passthrough bool, replayTimeKeeper *block.ReplayTimeKeeper, -) *worker { - return &worker{ +) (*worker, error) { + + w := &worker{ id: id, log: log.With("component", "disk_worker", "id", id, "file", file.Path, "compressed", file.Compressed, "file_index", file.Index, "loop_index", file.LoopIndex), @@ -72,25 +75,25 @@ func newWorker( replayTimeKeeper: replayTimeKeeper, } + if err := w.buildReader(); err != nil { + return nil, err + } + return w, nil } func (w *worker) UpdateSpeedLimit(bytesPerSecondLimit uint64) { + w.bytesPerSecondLimit = bytesPerSecondLimit if w.speedRd == nil { - return + panic("Speed reader is uninitialized, should be impossible") } - w.bytesPerSecondLimit = bytesPerSecondLimit w.speedRd.Reset(bytesPerSecondLimit) } func (w *worker) Run(ctx context.Context) error { w.log.Info("started", "speed_limit_bytes", w.bytesPerSecondLimit) - rd, rdClose, rdErr := w.buildReader() - if rdErr != nil { - return fmt.Errorf("failed to build reader: %w", rdErr) - } - defer rdClose() + defer w.cleanup() var dec proto.Block for { @@ -101,7 +104,7 @@ func (w *worker) Run(ctx context.Context) error { default: } - err := w.decodeBlock(ctx, rd, &dec) + err := w.decodeBlock(ctx, w.protoReader, &dec) if errors.Is(err, io.EOF) { w.log.Info("finished") return nil @@ -112,10 +115,10 @@ func (w *worker) Run(ctx context.Context) error { } } -func (w *worker) buildReader() (*proto.Reader, func(), error) { +func (w *worker) buildReader() error { data, err := os.Open(w.file.Path) if err != nil { - return nil, nil, fmt.Errorf("opening file: %w", err) + return fmt.Errorf("opening file: %w", err) } compressedSpeedRd := NewSpeedReader(data, func(n uint64) { @@ -139,7 +142,7 @@ func (w *worker) buildReader() (*proto.Reader, func(), error) { w.log.Error("closing file", "err", fileErr) } - return nil, nil, fmt.Errorf("creating zstd reader: %w", err) + return fmt.Errorf("creating zstd reader: %w", err) } zstdClose = zstdRd.Close @@ -152,7 +155,7 @@ func (w *worker) buildReader() (*proto.Reader, func(), error) { w.metrics.IncrementMetric(metrics.TotalBytesUncompressed, n) }) - cleanup := func() { + w.cleanup = func() { w.speedRd.Close() if zstdClose != nil { @@ -163,8 +166,9 @@ func (w *worker) buildReader() (*proto.Reader, func(), error) { w.log.Error("closing file", "err", fileErr) } } + w.protoReader = proto.NewReader(w.speedRd) - return proto.NewReader(w.speedRd), cleanup, nil + return nil } func (w *worker) decodeBlock(ctx context.Context, rd *proto.Reader, dec *proto.Block) error { diff --git a/internal/metrics/worker.go b/internal/metrics/worker.go index 9030d1f..714c9ca 100644 --- a/internal/metrics/worker.go +++ b/internal/metrics/worker.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "maps" "otelspam/internal/block" "runtime" "sync" @@ -121,46 +122,24 @@ func (w *Worker) Run(ctx context.Context) error { w.log.Info("started") defer w.log.Info("stopped") + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { select { case <-ctx.Done(): return ctx.Err() - case <-time.After(1 * time.Second): - // this should be set from somewhere else maybe, but this works fine for now - blockPoolCount, blockPoolCapacity := w.blockPool.Stats() - w.SetMetric(BlockPoolCount, uint64(blockPoolCount)) - w.SetMetric(BlockPoolCapacity, uint64(blockPoolCapacity)) - w.SetMetric(BlockQueueLength, uint64(len(w.blockQueue))) - w.SetMetric(BlocksRetiredTotal, uint64(w.blockPool.TotalRetired())) - - // this should be dynamically adjustable in the future, but for now we set it constantly - w.SetMetric(TargetBytesPerSecond, w.targetBytesPerSecond) - - w.collectRuntimeMetrics() - - // Drain the queue before pushing so no entries are lost between push and reset - w.drainMetricsQueue() + case m := <-w.metricsQueue: + w.applyMetricEntry(m) + case <-ticker.C: + w.collectInternalMetrics() if w.insertSQL != "" { - err := w.pushMetrics(ctx) - if err != nil { - w.log.Error("failed to push metrics", "err", err) - continue - } + snapshot, pointSnapshot := w.snapshotAndReset() + go w.pushMetricsSnapshot(ctx, snapshot, pointSnapshot) + } else { + w.resetMetrics() } - - w.resetMetrics() - } - } -} - -func (w *Worker) drainMetricsQueue() { - for { - select { - case m := <-w.metricsQueue: - w.applyMetricEntry(m) - default: - return } } } @@ -189,23 +168,34 @@ func (w *Worker) applyMetricEntry(m Entry) { } } -func (w *Worker) collectRuntimeMetrics() { +// collectInternalMetrics gathers block pool, runtime, and system metrics +// directly under the lock, avoiding the metrics channel entirely. +func (w *Worker) collectInternalMetrics() { + w.mu.Lock() + defer w.mu.Unlock() + + blockPoolCount, blockPoolCapacity := w.blockPool.Stats() + w.metrics[BlockPoolCount] = uint64(blockPoolCount) + w.metrics[BlockPoolCapacity] = uint64(blockPoolCapacity) + w.metrics[BlockQueueLength] = uint64(len(w.blockQueue)) + w.metrics[BlocksRetiredTotal] = uint64(w.blockPool.TotalRetired()) + w.metrics[TargetBytesPerSecond] = w.targetBytesPerSecond + var ms runtime.MemStats runtime.ReadMemStats(&ms) - w.SetMetric(ProgramHeapAllocBytes, ms.HeapAlloc) - w.SetMetric(ProgramSysBytes, ms.Sys) - w.SetMetric(ProgramNumGoroutines, uint64(runtime.NumGoroutine())) - w.SetMetric(ProgramNumGC, uint64(ms.NumGC)) - w.SetMetric(ProgramPauseTotalNs, ms.PauseTotalNs) - w.SetMetric(ProgramNextGCBytes, ms.NextGC) - - w.SetMetric(ProgramNumCPU, uint64(runtime.NumCPU())) + w.metrics[ProgramHeapAllocBytes] = ms.HeapAlloc + w.metrics[ProgramSysBytes] = ms.Sys + w.metrics[ProgramNumGoroutines] = uint64(runtime.NumGoroutine()) + w.metrics[ProgramNumGC] = uint64(ms.NumGC) + w.metrics[ProgramPauseTotalNs] = ms.PauseTotalNs + w.metrics[ProgramNextGCBytes] = ms.NextGC + w.metrics[ProgramNumCPU] = uint64(runtime.NumCPU()) var ru syscall.Rusage if err := syscall.Getrusage(syscall.RUSAGE_SELF, &ru); err == nil { - w.SetMetric(ProgramCPUUserNs, uint64(ru.Utime.Nano())) - w.SetMetric(ProgramCPUSysNs, uint64(ru.Stime.Nano())) + w.metrics[ProgramCPUUserNs] = uint64(ru.Utime.Nano()) + w.metrics[ProgramCPUSysNs] = uint64(ru.Stime.Nano()) } } @@ -213,8 +203,13 @@ func (w *Worker) resetMetrics() { w.mu.Lock() defer w.mu.Unlock() + w.resetMetricsLocked() +} + +// resetMetricsLocked zeroes resettable counters and clears point metrics. +// Caller must hold w.mu. +func (w *Worker) resetMetricsLocked() { for name := range w.metrics { - // Skip resetting these. They should probably go in their own table or something switch name { case TotalRows: case TotalBytesCompressed: @@ -246,10 +241,33 @@ func (w *Worker) resetMetrics() { w.pointMetrics = w.pointMetrics[:0] } -func (w *Worker) pushMetrics(ctx context.Context) error { +// snapshotAndReset copies the current metrics state and resets counters, +// returning owned copies safe to use from another goroutine. +func (w *Worker) snapshotAndReset() (map[Name]uint64, []Entry) { + w.mu.Lock() + defer w.mu.Unlock() + + snapshot := make(map[Name]uint64, len(w.metrics)) + maps.Copy(snapshot, w.metrics) + + var pointSnapshot []Entry + if len(w.pointMetrics) > 0 { + pointSnapshot = make([]Entry, len(w.pointMetrics)) + copy(pointSnapshot, w.pointMetrics) + } + + w.resetMetricsLocked() + + return snapshot, pointSnapshot +} + +// pushMetricsSnapshot sends a pre-built snapshot to ClickHouse. +// Safe to call from a goroutine since it owns the snapshot data. +func (w *Worker) pushMetricsSnapshot(ctx context.Context, snapshot map[Name]uint64, pointSnapshot []Entry) { batch, err := w.conn.PrepareBatch(ctx, w.insertSQL) if err != nil { - return fmt.Errorf("failed to prepare metrics batch: %w", err) + w.log.Error("failed to prepare metrics batch", "err", err) + return } defer func(batch driver.Batch) { batchErr := batch.Close() @@ -258,34 +276,30 @@ func (w *Worker) pushMetrics(ctx context.Context) error { } }(batch) - w.mu.Lock() now := time.Now() - for name, value := range w.metrics { + for name, value := range snapshot { err = batch.Append(w.runID, string(name), now, value, w.mergeAttributes(nil)) if err != nil { - w.mu.Unlock() - return fmt.Errorf("failed to append metric (%s/%d) to batch: %w", name, value, err) + w.log.Error("failed to append metric to batch", "name", name, "value", value, "err", err) + return } } - for _, m := range w.pointMetrics { + for _, m := range pointSnapshot { err = batch.Append(w.runID, string(m.Name), m.Timestamp, m.Value, w.mergeAttributes(m.Attributes)) if err != nil { - w.mu.Unlock() - return fmt.Errorf("failed to append point metric (%s/%d) to batch: %w", m.Name, m.Value, err) + w.log.Error("failed to append point metric to batch", "name", m.Name, "value", m.Value, "err", err) + return } } - w.mu.Unlock() - err = batch.Send() if err != nil { - return fmt.Errorf("failed to send metrics: %w", err) + w.log.Error("failed to send metrics", "err", err) + return } w.log.Debug("pushed metrics", "count", batch.Rows()) - - return nil } // mergeAttributes returns config attributes merged with per-metric attributes.