Skip to content

Commit d3dd5ae

Browse files
implement datacap client and filters for bandwidth usage tracking
1 parent 600d6d4 commit d3dd5ae

6 files changed

Lines changed: 490 additions & 11 deletions

File tree

datacap/datacap_client.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package datacap
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"strings"
8+
"time"
9+
10+
"github.com/getlantern/errors"
11+
)
12+
13+
type DatacapSidecarClient interface {
14+
TrackDatacapUsage(ctx context.Context, deviceID string, bytesUsed int64, countryCode, platform string) (usage *TrackDatacapResponse, err error)
15+
16+
GetDatacapUsage(ctx context.Context, deviceID string) (usage *TrackDatacapResponse, err error)
17+
}
18+
19+
type Config struct {
20+
SidecarAddr string
21+
HTTPClient *http.Client
22+
}
23+
24+
type datacapClient struct {
25+
config Config
26+
}
27+
28+
// TrackDatacapRequest represents the request to track data usage
29+
type TrackDatacapRequest struct {
30+
DeviceID string `json:"deviceId"`
31+
BytesUsed int64 `json:"bytesUsed"`
32+
CountryCode string `json:"countryCode"`
33+
Platform string `json:"platform"`
34+
}
35+
36+
// TrackDatacapResponse represents the response from tracking data usage
37+
type TrackDatacapResponse struct {
38+
Allowed bool `json:"allowed"`
39+
RemainingBytes int64 `json:"remainingBytes"`
40+
CapLimit int64 `json:"capLimit"`
41+
ExpiryTime int64 `json:"expiryTime"`
42+
}
43+
44+
func NewClient(config Config) DatacapSidecarClient {
45+
if config.HTTPClient == nil {
46+
config.HTTPClient = &http.Client{
47+
Timeout: 10 * time.Second,
48+
}
49+
}
50+
51+
return &datacapClient{
52+
config: config,
53+
}
54+
}
55+
56+
func (c *datacapClient) TrackDatacapUsage(ctx context.Context, deviceID string, bytesUsed int64, countryCode, platform string) (*TrackDatacapResponse, error) {
57+
req := TrackDatacapRequest{
58+
DeviceID: deviceID,
59+
BytesUsed: bytesUsed,
60+
CountryCode: countryCode,
61+
Platform: platform,
62+
}
63+
64+
// Ensure the sidecar address has a trailing slash
65+
sidecarAddr := c.config.SidecarAddr
66+
if !strings.HasSuffix(sidecarAddr, "/") {
67+
sidecarAddr += "/"
68+
}
69+
70+
url := sidecarAddr + "data-cap/usage"
71+
72+
reqBody, err := json.Marshal(req)
73+
if err != nil {
74+
return nil, errors.New("failed to marshal request for tracking datacap usage: %v", err)
75+
}
76+
77+
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, strings.NewReader(string(reqBody)))
78+
if err != nil {
79+
return nil, errors.New("failed to create request for tracking datacap usage: %v", err)
80+
}
81+
82+
httpReq.Header.Set("Content-Type", "application/json")
83+
84+
resp, err := c.config.HTTPClient.Do(httpReq)
85+
if err != nil {
86+
return nil, errors.New("failed to send request to sidecar: %v", err)
87+
}
88+
defer resp.Body.Close()
89+
90+
if resp.StatusCode != http.StatusOK {
91+
return nil, errors.New("sidecar returned non-200 status: %d", resp.StatusCode)
92+
}
93+
94+
var trackResp TrackDatacapResponse
95+
if err := json.NewDecoder(resp.Body).Decode(&trackResp); err != nil {
96+
return nil, errors.New("failed to decode response for tracking datacap usage: %v", err)
97+
}
98+
99+
log.Debugf("Track response for device %s: allowed=%v, remaining=%d",
100+
deviceID, trackResp.Allowed, trackResp.RemainingBytes)
101+
102+
return &trackResp, nil
103+
}
104+
105+
func (c *datacapClient) GetDatacapUsage(ctx context.Context, deviceID string) (*TrackDatacapResponse, error) {
106+
// Ensure the sidecar address has a trailing slash
107+
sidecarAddr := c.config.SidecarAddr
108+
if !strings.HasSuffix(sidecarAddr, "/") {
109+
sidecarAddr += "/"
110+
}
111+
112+
url := sidecarAddr + "data-cap/device/" + deviceID
113+
114+
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
115+
if err != nil {
116+
return nil, errors.New("failed to create request for tracking datacap usage: %v", err)
117+
}
118+
119+
resp, err := c.config.HTTPClient.Do(httpReq)
120+
if err != nil {
121+
return nil, errors.New("failed to send request to sidecar: %v", err)
122+
}
123+
defer resp.Body.Close()
124+
125+
if resp.StatusCode != http.StatusOK {
126+
return nil, errors.New("sidecar returned non-200 status: %d", resp.StatusCode)
127+
}
128+
129+
var trackResp TrackDatacapResponse
130+
if err := json.NewDecoder(resp.Body).Decode(&trackResp); err != nil {
131+
return nil, errors.New("failed to decode response for tracking datacap usage: %v", err)
132+
}
133+
134+
log.Debugf("Track response for device %s: allowed=%v, remaining=%d",
135+
deviceID, trackResp.Allowed, trackResp.RemainingBytes)
136+
137+
return &trackResp, nil
138+
}

datacap/device_filter.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package datacap
2+
3+
import (
4+
"fmt"
5+
"net/http"
6+
"net/http/httputil"
7+
"sync"
8+
"time"
9+
10+
"github.com/dustin/go-humanize"
11+
"github.com/getlantern/http-proxy-lantern/v2/common"
12+
"github.com/getlantern/http-proxy-lantern/v2/domains"
13+
"github.com/getlantern/http-proxy-lantern/v2/instrument"
14+
"github.com/getlantern/http-proxy-lantern/v2/listeners"
15+
"github.com/getlantern/http-proxy-lantern/v2/usage"
16+
"github.com/getlantern/proxy/v3/filters"
17+
)
18+
19+
var (
20+
epoch = time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC)
21+
22+
alwaysThrottle = listeners.NewRateLimiter(10, 10) // this is basically unusably slow, only used for malicious or really old/broken clients
23+
24+
defaultThrottleRate = int64(5000 * 1024 / 8) // 5 Mbps
25+
)
26+
27+
// deviceFilter handles filtering and throttling of requests based on datacap
28+
type deviceFilter struct {
29+
datacapClient DatacapSidecarClient
30+
instrument instrument.Instrument
31+
sendXBQHeader bool
32+
limitersByDevice map[string]*listeners.RateLimiter
33+
limitersByDeviceMx sync.Mutex
34+
}
35+
36+
// Settings represents the datacap settings for a device
37+
type Settings struct {
38+
Threshold int64
39+
}
40+
41+
// NewFilter creates a new datacap filter
42+
func NewFilter(datacapClient DatacapSidecarClient, instrument instrument.Instrument, sendXBQHeader bool) *deviceFilter {
43+
return &deviceFilter{
44+
datacapClient: datacapClient,
45+
instrument: instrument,
46+
sendXBQHeader: sendXBQHeader,
47+
limitersByDevice: make(map[string]*listeners.RateLimiter, 0),
48+
}
49+
}
50+
51+
// Apply applies the datacap filter to the request
52+
func (f *deviceFilter) Apply(cs *filters.ConnectionState, req *http.Request, next filters.Next) (*http.Response, *filters.ConnectionState, error) {
53+
54+
if log.IsTraceEnabled() {
55+
reqStr, _ := httputil.DumpRequest(req, true)
56+
log.Tracef("DeviceFilter Middleware received request:\n%s", reqStr)
57+
}
58+
59+
wc := cs.Downstream().(listeners.WrapConn)
60+
lanternDeviceID := req.Header.Get(common.DeviceIdHeader)
61+
if lanternDeviceID == "" {
62+
// Old lantern versions and possible cracks do not include the device
63+
// ID. Just throttle them.
64+
f.instrument.Throttle(req.Context(), true, "no-device-id")
65+
wc.ControlMessage("throttle", alwaysThrottle)
66+
return next(cs, req)
67+
}
68+
if lanternDeviceID == "~~~~~~" {
69+
// This is checkfallbacks, don't throttle it
70+
f.instrument.Throttle(req.Context(), false, "checkfallbacks")
71+
return next(cs, req)
72+
}
73+
74+
// Even if a device hasn't hit its data cap, we always throttle to a default throttle rate to
75+
// keep bandwidth hogs from using too much bandwidth. Note - this does not apply to pro proxies
76+
// which don't use the devicefilter at all.
77+
throttleDefault := func(message string) {
78+
if defaultThrottleRate <= 0 {
79+
f.instrument.Throttle(req.Context(), false, message)
80+
}
81+
limiter := f.rateLimiterForDevice(lanternDeviceID, defaultThrottleRate, defaultThrottleRate)
82+
if log.IsTraceEnabled() {
83+
log.Tracef("Throttling connection to %v per second by default",
84+
humanize.Bytes(uint64(defaultThrottleRate)))
85+
}
86+
f.instrument.Throttle(req.Context(), true, "default")
87+
wc.ControlMessage("throttle", limiter)
88+
}
89+
90+
// Some domains are excluded from being throttled and don't count towards the
91+
// bandwidth cap.
92+
if domains.ConfigForRequest(req).Unthrottled {
93+
throttleDefault("domain-excluded")
94+
return next(cs, req)
95+
}
96+
97+
// Check usage from cache only - no eager fetching
98+
u := usage.Get(lanternDeviceID)
99+
if u == nil {
100+
// No usage data available yet, allow the request
101+
f.instrument.Throttle(req.Context(), false, "no-usage-data")
102+
return next(cs, req)
103+
}
104+
105+
settings, err := f.datacapClient.GetDatacapUsage(req.Context(), lanternDeviceID)
106+
if err != nil {
107+
log.Errorf("failed to get datacap usage for device %s: %v", lanternDeviceID, err)
108+
f.instrument.Throttle(req.Context(), false, "datacap-error")
109+
//allow the request to proceed if we fail to get datacap usage
110+
settings = &TrackDatacapResponse{
111+
Allowed: true,
112+
}
113+
}
114+
115+
measuredCtx := map[string]interface{}{
116+
"throttled": false,
117+
}
118+
119+
var capOn bool
120+
121+
// To turn the datacap off we simply set the threshold to 0 or below
122+
if settings.Allowed {
123+
log.Tracef("Got datacap settings: %v", settings)
124+
capOn = settings.CapLimit > 0
125+
126+
measuredCtx["datacap_settings"] = settings
127+
if capOn {
128+
measuredCtx["datacap_threshold"] = settings.CapLimit
129+
measuredCtx["datacap_usage"] = u.Bytes
130+
}
131+
}
132+
133+
if capOn && u.Bytes > settings.CapLimit {
134+
f.instrument.Throttle(req.Context(), true, "over-datacap")
135+
measuredCtx["throttled"] = true
136+
limiter := f.rateLimiterForDevice(lanternDeviceID, defaultThrottleRate, defaultThrottleRate)
137+
if log.IsTraceEnabled() {
138+
log.Tracef("Throttling connection from device %s to %v per second", lanternDeviceID,
139+
humanize.Bytes(uint64(defaultThrottleRate)))
140+
}
141+
f.instrument.Throttle(req.Context(), true, "datacap")
142+
wc.ControlMessage("throttle", limiter)
143+
measuredCtx["throttled"] = true
144+
} else {
145+
throttleDefault("")
146+
}
147+
148+
wc.ControlMessage("measured", measuredCtx)
149+
150+
resp, nextCtx, err := next(cs, req)
151+
if resp == nil || err != nil {
152+
return resp, nextCtx, err
153+
}
154+
if !capOn || !f.sendXBQHeader {
155+
return resp, nextCtx, err
156+
}
157+
if resp.Header == nil {
158+
resp.Header = make(http.Header, 1)
159+
}
160+
uMiB := u.Bytes / (1024 * 1024)
161+
xbq := fmt.Sprintf("%d/%d/%d", uMiB, settings.CapLimit/(1024*1024), int64(u.AsOf.Sub(epoch).Seconds()))
162+
xbqv2 := fmt.Sprintf("%s/%d", xbq, u.TTLSeconds)
163+
resp.Header.Set(common.XBQHeader, xbq) // for backward compatibility with older clients
164+
resp.Header.Set(common.XBQHeaderv2, xbqv2) // for new clients that support different bandwidth cap expirations
165+
f.instrument.XBQHeaderSent(req.Context())
166+
return resp, nextCtx, err
167+
}
168+
169+
func (f *deviceFilter) rateLimiterForDevice(deviceID string, rateLimitRead, rateLimitWrite int64) *listeners.RateLimiter {
170+
f.limitersByDeviceMx.Lock()
171+
defer f.limitersByDeviceMx.Unlock()
172+
173+
limiter := f.limitersByDevice[deviceID]
174+
if limiter == nil || limiter.GetRateRead() != rateLimitRead || limiter.GetRateWrite() != rateLimitWrite {
175+
limiter = listeners.NewRateLimiter(rateLimitRead, rateLimitWrite)
176+
f.limitersByDevice[deviceID] = limiter
177+
}
178+
return limiter
179+
}

0 commit comments

Comments
 (0)