diff --git a/instrument/goodput_test.go b/instrument/goodput_test.go new file mode 100644 index 00000000..87f8bce5 --- /dev/null +++ b/instrument/goodput_test.go @@ -0,0 +1,119 @@ +package instrument + +import ( + "context" + "net" + "testing" + "time" + + sdkotel "go.opentelemetry.io/otel" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/getlantern/geo" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newGoodputInstrument wires a manual-reader meter provider into the global +// otel state and returns a defaultInstrument plus the reader to collect from. +func newGoodputInstrument(t *testing.T) (*defaultInstrument, *sdkmetric.ManualReader) { + t.Helper() + // Restore the global meter provider after the test so the manual-reader + // provider doesn't leak into other tests in the process. + prev := sdkotel.GetMeterProvider() + t.Cleanup(func() { sdkotel.SetMeterProvider(prev) }) + + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + sdkotel.SetMeterProvider(provider) + + ins, err := NewDefault(geo.NoLookup{}, &mockISPLookup{}, "test-proxy") + require.NoError(t, err) + return ins, reader +} + +// TestSessionGoodput verifies the per-session download goodput histogram is +// recorded once for a session that moved >= goodputMinBytes, with the value +// ~= received bytes / connection seconds and a receive direction tag. +func TestSessionGoodput(t *testing.T) { + ins, reader := newGoodputInstrument(t) + + const recv = 1_100_000 // above the 1MB goodput threshold + ins.SessionGoodput(context.Background(), recv, time.Second, net.ParseIP("1.2.3.4")) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + + count, sum, found := histogramCountSum(rm, "proxy.session.goodput") + require.True(t, found, "goodput histogram should be emitted for a >=1MB session") + assert.Equal(t, uint64(1), count, "exactly one goodput sample") + // 1s open duration → goodput ~= received bytes per second. + assert.InDelta(t, float64(recv), sum, float64(recv)*0.01) + + attrs := extractHistogramAttrs(rm, "proxy.session.goodput") + assert.Equal(t, "receive", attrs["network.io.direction"]) + // The country point attribute must always be present (empty here, since the + // test uses geo.NoLookup) so the metric stays sliceable by country. + _, hasCountry := attrs["geo.country.iso_code"] + assert.True(t, hasCountry, "goodput sample should carry the geo.country.iso_code attribute") +} + +// TestSessionGoodputBelowThreshold verifies a sub-threshold session records no +// goodput sample. +func TestSessionGoodputBelowThreshold(t *testing.T) { + ins, reader := newGoodputInstrument(t) + + ins.SessionGoodput(context.Background(), 42, time.Second, net.ParseIP("1.2.3.4")) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + + _, _, found := histogramCountSum(rm, "proxy.session.goodput") + assert.False(t, found, "no goodput sample below the byte threshold") +} + +// TestSessionGoodputZeroDuration verifies a non-positive duration records no +// sample (guards against divide-by-zero). +func TestSessionGoodputZeroDuration(t *testing.T) { + ins, reader := newGoodputInstrument(t) + + ins.SessionGoodput(context.Background(), 2_000_000, 0, net.ParseIP("1.2.3.4")) + + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + + _, _, found := histogramCountSum(rm, "proxy.session.goodput") + assert.False(t, found, "no goodput sample for a zero-duration session") +} + +func histogramCountSum(rm metricdata.ResourceMetrics, name string) (uint64, float64, bool) { + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name != name { + continue + } + if d, ok := m.Data.(metricdata.Histogram[float64]); ok && len(d.DataPoints) > 0 { + return d.DataPoints[0].Count, d.DataPoints[0].Sum, true + } + } + } + return 0, 0, false +} + +func extractHistogramAttrs(rm metricdata.ResourceMetrics, name string) map[string]string { + result := make(map[string]string) + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name != name { + continue + } + if d, ok := m.Data.(metricdata.Histogram[float64]); ok && len(d.DataPoints) > 0 { + for _, kv := range d.DataPoints[0].Attributes.ToSlice() { + result[string(kv.Key)] = kv.Value.Emit() + } + } + } + } + return result +} diff --git a/instrument/instrument.go b/instrument/instrument.go index 051d9326..679a3b73 100644 --- a/instrument/instrument.go +++ b/instrument/instrument.go @@ -26,6 +26,12 @@ var ( originRootRegex = regexp.MustCompile(`([^\.]+\.[^\.]+$)`) ) +// goodputMinBytes is the minimum received bytes a session must have moved +// before its goodput sample is recorded. Filters out idle/tiny connections +// whose bytes/duration is dominated by setup and idle time rather than actual +// transfer speed. +const goodputMinBytes = 1_000_000 + // Instrument is the common interface about what can be instrumented. type Instrument interface { WrapFilter(prefix string, f filters.Filter) (filters.Filter, error) @@ -37,6 +43,7 @@ type Instrument interface { XBQHeaderSent(ctx context.Context) SuspectedProbing(ctx context.Context, fromIP net.IP, reason string) ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string) + SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) Connection(ctx context.Context, clientIP net.IP) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) ReportProxiedBytes(tp *sdktrace.TracerProvider) @@ -73,6 +80,8 @@ func (i NoInstrument) XBQHeaderSent(ctx context.Context) func (i NoInstrument) SuspectedProbing(ctx context.Context, fromIP net.IP, reason string) {} func (i NoInstrument) ProxiedBytes(ctx context.Context, sent, recv int, platform, platformVersion, libVersion, appVersion, app, locale, dataCapCohort, probingError string, clientIP net.IP, deviceID, originHost, arch string) { } +func (i NoInstrument) SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) { +} func (i NoInstrument) ReportProxiedBytesPeriodically(interval time.Duration, tp *sdktrace.TracerProvider) { } func (i NoInstrument) ReportProxiedBytes(tp *sdktrace.TracerProvider) {} @@ -305,6 +314,27 @@ func (ins *defaultInstrument) ProxiedBytes(ctx context.Context, sent, recv int, ins.statsMx.Unlock() } +// SessionGoodput records a session's download goodput (received bytes per +// second of connection lifetime) at close, for sessions that moved at least +// goodputMinBytes. duration is the connection's open time; it includes idle +// periods, so this is a floor on true transfer speed — but both arms of a +// bandit experiment are measured identically, so it's a fair relative signal, +// and the byte floor filters the worst idle-dominated noise. No device_id tag +// (cardinality); track and cloud.region come from resource attributes, leaving +// geo.country.iso_code as the only point attribute the evaluator strata need. +func (ins *defaultInstrument) SessionGoodput(ctx context.Context, recvBytes int, duration time.Duration, clientIP net.IP) { + if recvBytes < goodputMinBytes || duration <= 0 { + return + } + goodput := float64(recvBytes) / duration.Seconds() + country := ins.countryLookup.CountryCode(clientIP) + otelinstrument.SessionGoodput.Record(ctx, goodput, + metric.WithAttributes( + semconv.GeoCountryISOCodeKey.String(country), + semconv.NetworkIODirectionKey.String("receive"), + )) +} + // Connection counts the number of incoming connections func (ins *defaultInstrument) Connection(ctx context.Context, clientIP net.IP) { fromCountry := ins.countryLookup.CountryCode(clientIP) diff --git a/instrument/otelinstrument/otelinstrument.go b/instrument/otelinstrument/otelinstrument.go index 020ab67c..a3bde0a8 100644 --- a/instrument/otelinstrument/otelinstrument.go +++ b/instrument/otelinstrument/otelinstrument.go @@ -32,6 +32,7 @@ var ( Throttling metric.Int64Counter SuspectedProbing metric.Int64Counter Connections metric.Int64Counter + SessionGoodput metric.Float64Histogram DistinctClients1m, DistinctClients10m, DistinctClients1h *distinct.SlidingWindowDistinctCount distinctClients metric.Int64ObservableGauge ) @@ -79,6 +80,18 @@ func initialize() error { if Connections, err = meter.Int64Counter("proxy.connections"); err != nil { return err } + // Per-session download goodput (received bytes per second of connection + // lifetime), recorded once at connection close for sessions that moved at + // least goodputMinBytes. Sliceable by track (resource attr) × cloud.region + // (resource attr) × geo.country.iso_code (point attr) so the bandit + // experiment evaluator can compare a challenger track's median goodput + // against the incumbent's. Unit "bytes/s" follows proxy.io's "bytes" + // spelling for consistency within this package's metrics. + if SessionGoodput, err = meter.Float64Histogram("proxy.session.goodput", + metric.WithUnit("bytes/s"), + metric.WithDescription("Per-session download goodput: received bytes per second of connection lifetime")); err != nil { + return err + } DistinctClients1m = distinct.NewSlidingWindowDistinctCount(time.Minute, time.Second) DistinctClients10m = distinct.NewSlidingWindowDistinctCount(10*time.Minute, 10*time.Second) diff --git a/reporting.go b/reporting.go index e4c89922..bbffafc6 100644 --- a/reporting.go +++ b/reporting.go @@ -29,8 +29,28 @@ type reportingConfig struct { func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, instrument instrument.Instrument, throttleConfig throttle.Config) *reportingConfig { proxiedBytesReporter := func(ctx map[string]interface{}, stats *measured.Stats, deltaStats *measured.Stats, final bool) { - if deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0 { - // nothing to report + noDelta := deltaStats.SentTotal == 0 && deltaStats.RecvTotal == 0 + if noDelta && !final { + // nothing to report on an idle, non-final interval; return before + // any client IP parsing to keep periodic reporting cheap. + return + } + + var client_ip net.IP + if s, ok := ctx[common.ClientIP].(string); ok { + client_ip = net.ParseIP(s) + } + + if final { + // Record per-session download goodput once at connection close, + // using the connection's cumulative received bytes and open time. + // Done before the zero-delta early return below so a session that + // was idle during its final reporting interval is still counted. + instrument.SessionGoodput(context.Background(), stats.RecvTotal, stats.Duration, client_ip) + } + + if noDelta { + // nothing more to report (final call with no new bytes this interval) return } // Note - sometimes we're missing the platform and version @@ -45,12 +65,6 @@ func newReportingConfig(countryLookup geo.CountryLookup, rc *rclient.Client, ins probingError := fromContext(ctx, common.ProbingError) arch := fromContext(ctx, common.KernelArch) - var client_ip net.IP - _client_ip := ctx[common.ClientIP] - if _client_ip != nil { - client_ip = net.ParseIP(_client_ip.(string)) - } - dataCapCohort := "" throttleSettings, hasThrottleSettings := ctx[common.ThrottleSettings] if hasThrottleSettings {