Skip to content

Commit e5ad7d2

Browse files
committed
feat: show disk/network throughput with bottleneck indicator
Replace generic "source/effective" labels with operation-aware labels (push: disk read / network write, pull: network read / disk write). Derive sink throughput from transferTime - sourceReadTime. Mark the slower side as bottleneck. Add overall effective throughput to output. Signed-off-by: Zhao Chen <winters.zc@antgroup.com>
1 parent 20c4b87 commit e5ad7d2

2 files changed

Lines changed: 81 additions & 36 deletions

File tree

pkg/iometrics/tracker.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,22 @@ func (t *Tracker) TrackTransfer(fn func() error) error {
6666
return err
6767
}
6868

69+
// sourceLabel and sinkLabel return human-readable labels for the source
70+
// and sink sides based on the operation type.
71+
func (t *Tracker) sourceLabel() string {
72+
if t.operation == "push" {
73+
return "disk read"
74+
}
75+
return "network read"
76+
}
77+
78+
func (t *Tracker) sinkLabel() string {
79+
if t.operation == "push" {
80+
return "network write"
81+
}
82+
return "disk write"
83+
}
84+
6985
// Summary outputs a throughput summary to both the log file (logrus)
7086
// and the terminal (stderr). Call this after all goroutines have
7187
// completed (after g.Wait()) — the happens-before from errgroup
@@ -80,30 +96,43 @@ func (t *Tracker) Summary() {
8096
return
8197
}
8298

83-
sourceReadTime := time.Duration(sourceNanos)
99+
sourceDuration := time.Duration(sourceNanos)
100+
sinkNanos := transferNanos - sourceNanos
101+
sinkDuration := time.Duration(max(sinkNanos, 0))
84102

85-
var readFraction float64
86-
if transferNanos > 0 {
87-
readFraction = float64(sourceNanos) / float64(transferNanos)
103+
sourceThroughput := formatThroughput(totalBytes, sourceDuration)
104+
sinkThroughput := formatThroughput(totalBytes, sinkDuration)
105+
106+
// Identify the bottleneck by comparing cumulative durations.
107+
bottleneck := t.sinkLabel()
108+
if sourceNanos > sinkNanos {
109+
bottleneck = t.sourceLabel()
88110
}
89111

90112
// Log structured fields to log file.
91113
logrus.WithFields(logrus.Fields{
92114
"operation": t.operation,
93115
"totalBytes": formatBytes(uint64(totalBytes)),
94116
"wallClock": wallClock.Round(time.Millisecond).String(),
95-
"sourceReadTime": sourceReadTime.Round(time.Millisecond).String(),
96-
"sourceReadThroughput": formatThroughput(totalBytes, sourceReadTime),
97117
"effectiveThroughput": formatThroughput(totalBytes, wallClock),
98-
"readFraction": fmt.Sprintf("%.2f", readFraction),
118+
t.sourceLabel(): sourceThroughput,
119+
t.sinkLabel(): sinkThroughput,
120+
"bottleneck": bottleneck,
99121
}).Info("io throughput summary")
100122

101123
// Print concise summary to terminal.
102-
fmt.Fprintf(os.Stderr, "IO summary: %s in %s, effective %s, source %s, read ratio %.2f\n",
124+
srcArrow := ""
125+
snkArrow := ""
126+
if bottleneck == t.sourceLabel() {
127+
srcArrow = " ← bottleneck"
128+
} else {
129+
snkArrow = " ← bottleneck"
130+
}
131+
fmt.Fprintf(os.Stderr, "IO summary: %s in %s, %s | %s: %s%s | %s: %s%s\n",
103132
formatBytes(uint64(totalBytes)),
104133
wallClock.Round(time.Millisecond),
105134
formatThroughput(totalBytes, wallClock),
106-
formatThroughput(totalBytes, sourceReadTime),
107-
readFraction,
135+
t.sourceLabel(), sourceThroughput, srcArrow,
136+
t.sinkLabel(), sinkThroughput, snkArrow,
108137
)
109138
}

pkg/iometrics/tracker_test.go

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -210,44 +210,60 @@ func TestSummaryOutput(t *testing.T) {
210210
"io throughput summary",
211211
"operation=push",
212212
"totalBytes=",
213-
"readFraction=",
213+
"bottleneck=",
214+
"disk read",
215+
"network write",
214216
} {
215217
if !strings.Contains(output, expected) {
216218
t.Errorf("log output missing %q, got:\n%s", expected, output)
217219
}
218220
}
219221
}
220222

221-
func TestReadFractionAccuracy(t *testing.T) {
222-
tracker := NewTracker("test")
223+
func TestBottleneckDetection(t *testing.T) {
224+
// Simulate source-bottleneck: source read takes 80ms, sink takes 20ms.
225+
t.Run("source bottleneck", func(t *testing.T) {
226+
tracker := NewTracker("pull")
227+
sr := &slowReader{data: make([]byte, 64), delay: 80 * time.Millisecond}
228+
wrapped := tracker.WrapReader(sr)
229+
tracker.TrackTransfer(func() error {
230+
_, err := io.ReadAll(wrapped)
231+
if err != nil {
232+
return err
233+
}
234+
time.Sleep(20 * time.Millisecond)
235+
return nil
236+
})
223237

224-
// Simulate: source read takes 80ms, total transfer takes 100ms.
225-
// readFraction should be ~0.8.
226-
sr := &slowReader{data: make([]byte, 64), delay: 80 * time.Millisecond}
227-
wrapped := tracker.WrapReader(sr)
228-
tracker.TrackTransfer(func() error {
229-
_, err := io.ReadAll(wrapped)
230-
if err != nil {
231-
return err
238+
sourceNanos := tracker.sourceNanos.Load()
239+
sinkNanos := tracker.transferNanos.Load() - sourceNanos
240+
if sourceNanos <= sinkNanos {
241+
t.Errorf("expected source > sink for source bottleneck: source=%d, sink=%d",
242+
sourceNanos, sinkNanos)
232243
}
233-
// Simulate 20ms of sink write time.
234-
time.Sleep(20 * time.Millisecond)
235-
return nil
236244
})
237245

238-
sourceNanos := tracker.sourceNanos.Load()
239-
transferNanos := tracker.transferNanos.Load()
240-
241-
if transferNanos == 0 {
242-
t.Fatal("transferNanos should be > 0")
243-
}
246+
// Simulate sink-bottleneck: source read takes 10ms, sink takes 80ms.
247+
t.Run("sink bottleneck", func(t *testing.T) {
248+
tracker := NewTracker("push")
249+
sr := &slowReader{data: make([]byte, 64), delay: 10 * time.Millisecond}
250+
wrapped := tracker.WrapReader(sr)
251+
tracker.TrackTransfer(func() error {
252+
_, err := io.ReadAll(wrapped)
253+
if err != nil {
254+
return err
255+
}
256+
time.Sleep(80 * time.Millisecond)
257+
return nil
258+
})
244259

245-
readFraction := float64(sourceNanos) / float64(transferNanos)
246-
// Expect readFraction to be roughly 0.8 (with tolerance for scheduling jitter).
247-
if readFraction < 0.5 || readFraction > 0.95 {
248-
t.Errorf("readFraction = %.2f, expected ~0.8 (sourceNanos=%d, transferNanos=%d)",
249-
readFraction, sourceNanos, transferNanos)
250-
}
260+
sourceNanos := tracker.sourceNanos.Load()
261+
sinkNanos := tracker.transferNanos.Load() - sourceNanos
262+
if sinkNanos <= sourceNanos {
263+
t.Errorf("expected sink > source for sink bottleneck: source=%d, sink=%d",
264+
sourceNanos, sinkNanos)
265+
}
266+
})
251267
}
252268

253269
func TestFormatBytes(t *testing.T) {

0 commit comments

Comments
 (0)