Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions internal/disk/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,16 @@ func (s *Scheduler) runWorker(ctx context.Context, workerID int, fileCh <-chan d
return
}

w := newWorker(workerID, s.workerLog, file, s.diskCfg.ShiftTimestamp, s.getNewWorkerSpeed(), s.blockPool, s.insertQueue, s.metrics, s.passthrough, s.replayTimeKeeper)
w, err := newWorker(workerID, s.workerLog, file, s.diskCfg.ShiftTimestamp, s.getNewWorkerSpeed(), s.blockPool, s.insertQueue, s.metrics, s.passthrough, s.replayTimeKeeper)
if err != nil && !errors.Is(err, context.Canceled) {
s.log.Warn("newWorker failed, skipping file", "worker_id", workerID, "file", file.Path, "err", err)
continue
}

s.register(workerID, w)
s.rebalanceSpeed()

err := w.Run(ctx)
err = w.Run(ctx)

s.unregister(workerID)
s.rebalanceSpeed()
Expand Down
34 changes: 19 additions & 15 deletions internal/disk/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type worker struct {

blockPool block.Pool
insertQueue chan<- block.SharedColumns
cleanup func()
protoReader *proto.Reader
}

func newWorker(
Expand All @@ -54,8 +56,9 @@ func newWorker(
metrics metrics.Store,
passthrough bool,
replayTimeKeeper *block.ReplayTimeKeeper,
) *worker {
return &worker{
) (*worker, error) {

w := &worker{
id: id,
log: log.With("component", "disk_worker", "id", id, "file", file.Path, "compressed", file.Compressed, "file_index", file.Index, "loop_index", file.LoopIndex),

Expand All @@ -72,25 +75,25 @@ func newWorker(

replayTimeKeeper: replayTimeKeeper,
}
if err := w.buildReader(); err != nil {
return nil, err
}
return w, nil
}

func (w *worker) UpdateSpeedLimit(bytesPerSecondLimit uint64) {
w.bytesPerSecondLimit = bytesPerSecondLimit
if w.speedRd == nil {
return
panic("Speed reader is uninitialized, should be impossible")
}

w.bytesPerSecondLimit = bytesPerSecondLimit
w.speedRd.Reset(bytesPerSecondLimit)
}

func (w *worker) Run(ctx context.Context) error {
w.log.Info("started", "speed_limit_bytes", w.bytesPerSecondLimit)

rd, rdClose, rdErr := w.buildReader()
if rdErr != nil {
return fmt.Errorf("failed to build reader: %w", rdErr)
}
defer rdClose()
defer w.cleanup()

var dec proto.Block
for {
Expand All @@ -101,7 +104,7 @@ func (w *worker) Run(ctx context.Context) error {
default:
}

err := w.decodeBlock(ctx, rd, &dec)
err := w.decodeBlock(ctx, w.protoReader, &dec)
if errors.Is(err, io.EOF) {
w.log.Info("finished")
return nil
Expand All @@ -112,10 +115,10 @@ func (w *worker) Run(ctx context.Context) error {
}
}

func (w *worker) buildReader() (*proto.Reader, func(), error) {
func (w *worker) buildReader() error {
data, err := os.Open(w.file.Path)
if err != nil {
return nil, nil, fmt.Errorf("opening file: %w", err)
return fmt.Errorf("opening file: %w", err)
}

compressedSpeedRd := NewSpeedReader(data, func(n uint64) {
Expand All @@ -139,7 +142,7 @@ func (w *worker) buildReader() (*proto.Reader, func(), error) {
w.log.Error("closing file", "err", fileErr)
}

return nil, nil, fmt.Errorf("creating zstd reader: %w", err)
return fmt.Errorf("creating zstd reader: %w", err)
}

zstdClose = zstdRd.Close
Expand All @@ -152,7 +155,7 @@ func (w *worker) buildReader() (*proto.Reader, func(), error) {
w.metrics.IncrementMetric(metrics.TotalBytesUncompressed, n)
})

cleanup := func() {
w.cleanup = func() {
w.speedRd.Close()

if zstdClose != nil {
Expand All @@ -163,8 +166,9 @@ func (w *worker) buildReader() (*proto.Reader, func(), error) {
w.log.Error("closing file", "err", fileErr)
}
}
w.protoReader = proto.NewReader(w.speedRd)

return proto.NewReader(w.speedRd), cleanup, nil
return nil
}

func (w *worker) decodeBlock(ctx context.Context, rd *proto.Reader, dec *proto.Block) error {
Expand Down
132 changes: 73 additions & 59 deletions internal/metrics/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"maps"
"otelspam/internal/block"
"runtime"
"sync"
Expand Down Expand Up @@ -121,46 +122,24 @@ func (w *Worker) Run(ctx context.Context) error {
w.log.Info("started")
defer w.log.Info("stopped")

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
// this should be set from somewhere else maybe, but this works fine for now
blockPoolCount, blockPoolCapacity := w.blockPool.Stats()
w.SetMetric(BlockPoolCount, uint64(blockPoolCount))
w.SetMetric(BlockPoolCapacity, uint64(blockPoolCapacity))
w.SetMetric(BlockQueueLength, uint64(len(w.blockQueue)))
w.SetMetric(BlocksRetiredTotal, uint64(w.blockPool.TotalRetired()))

// this should be dynamically adjustable in the future, but for now we set it constantly
w.SetMetric(TargetBytesPerSecond, w.targetBytesPerSecond)

w.collectRuntimeMetrics()

// Drain the queue before pushing so no entries are lost between push and reset
w.drainMetricsQueue()
case m := <-w.metricsQueue:
w.applyMetricEntry(m)
case <-ticker.C:
w.collectInternalMetrics()

if w.insertSQL != "" {
err := w.pushMetrics(ctx)
if err != nil {
w.log.Error("failed to push metrics", "err", err)
continue
}
snapshot, pointSnapshot := w.snapshotAndReset()
go w.pushMetricsSnapshot(ctx, snapshot, pointSnapshot)
} else {
w.resetMetrics()
}

w.resetMetrics()
}
}
}

func (w *Worker) drainMetricsQueue() {
for {
select {
case m := <-w.metricsQueue:
w.applyMetricEntry(m)
default:
return
}
}
}
Expand Down Expand Up @@ -189,32 +168,48 @@ func (w *Worker) applyMetricEntry(m Entry) {
}
}

func (w *Worker) collectRuntimeMetrics() {
// collectInternalMetrics gathers block pool, runtime, and system metrics
// directly under the lock, avoiding the metrics channel entirely.
func (w *Worker) collectInternalMetrics() {
w.mu.Lock()
defer w.mu.Unlock()

blockPoolCount, blockPoolCapacity := w.blockPool.Stats()
w.metrics[BlockPoolCount] = uint64(blockPoolCount)
w.metrics[BlockPoolCapacity] = uint64(blockPoolCapacity)
w.metrics[BlockQueueLength] = uint64(len(w.blockQueue))
w.metrics[BlocksRetiredTotal] = uint64(w.blockPool.TotalRetired())
w.metrics[TargetBytesPerSecond] = w.targetBytesPerSecond

var ms runtime.MemStats
runtime.ReadMemStats(&ms)

w.SetMetric(ProgramHeapAllocBytes, ms.HeapAlloc)
w.SetMetric(ProgramSysBytes, ms.Sys)
w.SetMetric(ProgramNumGoroutines, uint64(runtime.NumGoroutine()))
w.SetMetric(ProgramNumGC, uint64(ms.NumGC))
w.SetMetric(ProgramPauseTotalNs, ms.PauseTotalNs)
w.SetMetric(ProgramNextGCBytes, ms.NextGC)

w.SetMetric(ProgramNumCPU, uint64(runtime.NumCPU()))
w.metrics[ProgramHeapAllocBytes] = ms.HeapAlloc
w.metrics[ProgramSysBytes] = ms.Sys
w.metrics[ProgramNumGoroutines] = uint64(runtime.NumGoroutine())
w.metrics[ProgramNumGC] = uint64(ms.NumGC)
w.metrics[ProgramPauseTotalNs] = ms.PauseTotalNs
w.metrics[ProgramNextGCBytes] = ms.NextGC
w.metrics[ProgramNumCPU] = uint64(runtime.NumCPU())

var ru syscall.Rusage
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &ru); err == nil {
w.SetMetric(ProgramCPUUserNs, uint64(ru.Utime.Nano()))
w.SetMetric(ProgramCPUSysNs, uint64(ru.Stime.Nano()))
w.metrics[ProgramCPUUserNs] = uint64(ru.Utime.Nano())
w.metrics[ProgramCPUSysNs] = uint64(ru.Stime.Nano())
}
}

func (w *Worker) resetMetrics() {
w.mu.Lock()
defer w.mu.Unlock()

w.resetMetricsLocked()
}

// resetMetricsLocked zeroes resettable counters and clears point metrics.
// Caller must hold w.mu.
func (w *Worker) resetMetricsLocked() {
for name := range w.metrics {
// Skip resetting these. They should probably go in their own table or something
switch name {
case TotalRows:
case TotalBytesCompressed:
Expand Down Expand Up @@ -246,10 +241,33 @@ func (w *Worker) resetMetrics() {
w.pointMetrics = w.pointMetrics[:0]
}

func (w *Worker) pushMetrics(ctx context.Context) error {
// snapshotAndReset copies the current metrics state and resets counters,
// returning owned copies safe to use from another goroutine.
func (w *Worker) snapshotAndReset() (map[Name]uint64, []Entry) {
w.mu.Lock()
defer w.mu.Unlock()

snapshot := make(map[Name]uint64, len(w.metrics))
maps.Copy(snapshot, w.metrics)

var pointSnapshot []Entry
if len(w.pointMetrics) > 0 {
pointSnapshot = make([]Entry, len(w.pointMetrics))
copy(pointSnapshot, w.pointMetrics)
}

w.resetMetricsLocked()

return snapshot, pointSnapshot
}

// pushMetricsSnapshot sends a pre-built snapshot to ClickHouse.
// Safe to call from a goroutine since it owns the snapshot data.
func (w *Worker) pushMetricsSnapshot(ctx context.Context, snapshot map[Name]uint64, pointSnapshot []Entry) {
batch, err := w.conn.PrepareBatch(ctx, w.insertSQL)
if err != nil {
return fmt.Errorf("failed to prepare metrics batch: %w", err)
w.log.Error("failed to prepare metrics batch", "err", err)
return
}
defer func(batch driver.Batch) {
batchErr := batch.Close()
Expand All @@ -258,34 +276,30 @@ func (w *Worker) pushMetrics(ctx context.Context) error {
}
}(batch)

w.mu.Lock()
now := time.Now()
for name, value := range w.metrics {
for name, value := range snapshot {
err = batch.Append(w.runID, string(name), now, value, w.mergeAttributes(nil))
if err != nil {
w.mu.Unlock()
return fmt.Errorf("failed to append metric (%s/%d) to batch: %w", name, value, err)
w.log.Error("failed to append metric to batch", "name", name, "value", value, "err", err)
return
}
}

for _, m := range w.pointMetrics {
for _, m := range pointSnapshot {
err = batch.Append(w.runID, string(m.Name), m.Timestamp, m.Value, w.mergeAttributes(m.Attributes))
if err != nil {
w.mu.Unlock()
return fmt.Errorf("failed to append point metric (%s/%d) to batch: %w", m.Name, m.Value, err)
w.log.Error("failed to append point metric to batch", "name", m.Name, "value", m.Value, "err", err)
return
}
}

w.mu.Unlock()

err = batch.Send()
if err != nil {
return fmt.Errorf("failed to send metrics: %w", err)
w.log.Error("failed to send metrics", "err", err)
return
}

w.log.Debug("pushed metrics", "count", batch.Rows())

return nil
}

// mergeAttributes returns config attributes merged with per-metric attributes.
Expand Down