Skip to content

Commit f6a9929

Browse files
committed
Add change counter to geoprobes that is incremented when targets are added or removed.
1 parent 7a3b010 commit f6a9929

20 files changed

Lines changed: 405 additions & 78 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ All notable changes to this project will be documented in this file.
88

99
### Changes
1010

11+
- Onchain Programs
12+
- Add `target_update_count` field to GeoProbe account, incremented on `AddTarget` and `RemoveTarget`; uses `BorshDeserializeIncremental` so existing accounts default to 0 (non-breaking)
13+
- SDK
14+
- Add `TargetUpdateCount` field to Go GeoProbe struct with backward-compatible deserialization
15+
- Telemetry
16+
- Skip expensive `GetGeolocationUsers` RPC scan in geoprobe-agent when the probe's `target_update_count` is unchanged, with a forced full refresh every ~5 minutes as safety net
17+
1118
## [v0.13.0](https://github.com/malbeclabs/doublezero/compare/client/v0.12.0...client/v0.13.0) - 2026-03-20
1219

1320
### Breaking

controlplane/telemetry/cmd/geoprobe-agent/main.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os/signal"
1313
"strings"
1414
"sync"
15+
"sync/atomic"
1516
"syscall"
1617
"time"
1718

@@ -533,19 +534,24 @@ func main() {
533534
}
534535
}()
535536

537+
// Shared counter: parent discovery writes the GeoProbe target_update_count on each
538+
// poll; target discovery reads it to skip expensive full scans when unchanged.
539+
var probeTargetUpdateCount atomic.Uint32
540+
536541
// Run parent DZD discovery if program IDs are configured.
537542
if parentDiscoveryEnabled {
538543
parentUpdateCh := make(chan geoprobe.ParentUpdate, 1)
539544
geoProbeClient := geoprobe.NewRPCGeoProbeClient(rpcClient, geolocationProgramID)
540545
deviceResolver := geoprobe.NewRPCDeviceResolver(rpcClient, serviceabilityProgramID)
541546

542547
pd, err := geoprobe.NewParentDiscovery(&geoprobe.ParentDiscoveryConfig{
543-
GeoProbePubkey: geoProbePubkey,
544-
Client: geoProbeClient,
545-
Resolver: deviceResolver,
546-
CLIParents: cliParentAuthorities,
547-
Interval: parentDiscoveryInterval,
548-
Logger: log,
548+
GeoProbePubkey: geoProbePubkey,
549+
Client: geoProbeClient,
550+
Resolver: deviceResolver,
551+
CLIParents: cliParentAuthorities,
552+
Interval: parentDiscoveryInterval,
553+
Logger: log,
554+
ProbeTargetUpdateCount: &probeTargetUpdateCount,
549555
})
550556
if err != nil {
551557
log.Error("Failed to create parent discovery", "error", err)
@@ -578,12 +584,13 @@ func main() {
578584
if !geolocationProgramID.IsZero() {
579585
geolocationUserClient := geolocation.New(log, rpcClient, geolocationProgramID)
580586
td, err := geoprobe.NewTargetDiscovery(&geoprobe.TargetDiscoveryConfig{
581-
GeoProbePubkey: geoProbePubkey,
582-
Client: geolocationUserClient,
583-
CLITargets: targets,
584-
CLIAllowedKeys: allowedKeys,
585-
Interval: discoveryInterval,
586-
Logger: log,
587+
GeoProbePubkey: geoProbePubkey,
588+
Client: geolocationUserClient,
589+
CLITargets: targets,
590+
CLIAllowedKeys: allowedKeys,
591+
Interval: discoveryInterval,
592+
Logger: log,
593+
ProbeTargetUpdateCount: &probeTargetUpdateCount,
587594
})
588595
if err != nil {
589596
log.Error("Failed to create target discovery", "error", err)

controlplane/telemetry/internal/geoprobe/onchain_discovery.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"log/slog"
8+
"sync/atomic"
89
"time"
910

1011
"github.com/gagliardetto/solana-go"
@@ -38,22 +39,24 @@ type ParentUpdate struct {
3839

3940
// ParentDiscoveryConfig holds configuration for parent discovery.
4041
type ParentDiscoveryConfig struct {
41-
GeoProbePubkey solana.PublicKey
42-
Client GeoProbeAccountClient
43-
Resolver DeviceResolver
44-
CLIParents map[[32]byte][32]byte // static parents from --additional-parent
45-
Interval time.Duration
46-
Logger *slog.Logger
42+
GeoProbePubkey solana.PublicKey
43+
Client GeoProbeAccountClient
44+
Resolver DeviceResolver
45+
CLIParents map[[32]byte][32]byte // static parents from --additional-parent
46+
Interval time.Duration
47+
Logger *slog.Logger
48+
ProbeTargetUpdateCount *atomic.Uint32 // shared counter for target discovery change detection
4749
}
4850

4951
// ParentDiscovery polls the GeoProbe account and resolves parent devices.
5052
type ParentDiscovery struct {
51-
log *slog.Logger
52-
geoProbePubkey solana.PublicKey
53-
client GeoProbeAccountClient
54-
resolver DeviceResolver
55-
cliParents map[[32]byte][32]byte
56-
interval time.Duration
53+
log *slog.Logger
54+
geoProbePubkey solana.PublicKey
55+
client GeoProbeAccountClient
56+
resolver DeviceResolver
57+
cliParents map[[32]byte][32]byte
58+
interval time.Duration
59+
probeTargetUpdateCount *atomic.Uint32
5760

5861
cachedParentDevices []solana.PublicKey
5962
tickCount uint64
@@ -83,12 +86,13 @@ func NewParentDiscovery(cfg *ParentDiscoveryConfig) (*ParentDiscovery, error) {
8386
}
8487

8588
return &ParentDiscovery{
86-
log: cfg.Logger,
87-
geoProbePubkey: cfg.GeoProbePubkey,
88-
client: cfg.Client,
89-
resolver: cfg.Resolver,
90-
cliParents: cliParents,
91-
interval: cfg.Interval,
89+
log: cfg.Logger,
90+
geoProbePubkey: cfg.GeoProbePubkey,
91+
client: cfg.Client,
92+
resolver: cfg.Resolver,
93+
cliParents: cliParents,
94+
interval: cfg.Interval,
95+
probeTargetUpdateCount: cfg.ProbeTargetUpdateCount,
9296
}, nil
9397
}
9498

@@ -152,6 +156,11 @@ func (d *ParentDiscovery) discover(ctx context.Context) (*ParentUpdate, error) {
152156
return d.cliOnlyUpdate(), nil
153157
}
154158

159+
// Publish the probe's target_update_count for target discovery change detection.
160+
if d.probeTargetUpdateCount != nil {
161+
d.probeTargetUpdateCount.Store(probe.TargetUpdateCount)
162+
}
163+
155164
// Check if parent device set changed since last poll.
156165
if !forceFullRefresh && pubkeySlicesEqual(d.cachedParentDevices, probe.ParentDevices) {
157166
d.log.Debug("Parent device set unchanged, skipping resolution",

controlplane/telemetry/internal/geoprobe/onchain_discovery_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"log/slog"
77
"os"
8+
"sync/atomic"
89
"testing"
910
"time"
1011

@@ -611,3 +612,46 @@ func TestPubkeySlicesEqual(t *testing.T) {
611612
})
612613
}
613614
}
615+
616+
func TestParentDiscovery_StoresTargetUpdateCount(t *testing.T) {
617+
t.Parallel()
618+
619+
geoProbePK := solana.NewWallet().PublicKey()
620+
parentDevicePK := solana.NewWallet().PublicKey()
621+
var metricsKey [32]byte
622+
metricsKey = solana.NewWallet().PublicKey()
623+
624+
client := &mockGeoProbeAccountClient{
625+
probe: &geolocation.GeoProbe{
626+
AccountType: geolocation.AccountTypeGeoProbe,
627+
ParentDevices: []solana.PublicKey{parentDevicePK},
628+
TargetUpdateCount: 42,
629+
},
630+
}
631+
632+
resolver := &mockDeviceResolver{
633+
devices: map[solana.PublicKey]*serviceability.Device{
634+
parentDevicePK: {
635+
PublicIp: [4]uint8{10, 0, 0, 1},
636+
MetricsPublisherPubKey: metricsKey,
637+
},
638+
},
639+
}
640+
641+
var counter atomic.Uint32
642+
pd, err := NewParentDiscovery(&ParentDiscoveryConfig{
643+
Logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
644+
Client: client,
645+
Resolver: resolver,
646+
GeoProbePubkey: geoProbePK,
647+
Interval: 10 * time.Millisecond,
648+
ProbeTargetUpdateCount: &counter,
649+
})
650+
require.NoError(t, err)
651+
652+
update, err := pd.discover(context.Background())
653+
require.NoError(t, err)
654+
require.NotNil(t, update)
655+
656+
assert.Equal(t, uint32(42), counter.Load())
657+
}

controlplane/telemetry/internal/geoprobe/target_discovery.go

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log/slog"
77
"sort"
8+
"sync/atomic"
89
"time"
910

1011
"github.com/gagliardetto/solana-go"
@@ -27,30 +28,38 @@ type InboundKeyUpdate struct {
2728
Keys [][32]byte
2829
}
2930

31+
// targetDiscoveryFullRefreshEvery controls how often a full GeolocationUser scan
32+
// is forced regardless of whether the GeoProbe target_update_count has changed.
33+
// At the default 60s interval, 5 means a full refresh every ~5 minutes.
34+
const targetDiscoveryFullRefreshEvery = 5
35+
3036
// TargetDiscoveryConfig holds configuration for target discovery.
3137
type TargetDiscoveryConfig struct {
32-
GeoProbePubkey solana.PublicKey
33-
Client GeolocationUserClient
34-
CLITargets []ProbeAddress
35-
CLIAllowedKeys [][32]byte
36-
Interval time.Duration
37-
Logger *slog.Logger
38+
GeoProbePubkey solana.PublicKey
39+
Client GeolocationUserClient
40+
CLITargets []ProbeAddress
41+
CLIAllowedKeys [][32]byte
42+
Interval time.Duration
43+
Logger *slog.Logger
44+
ProbeTargetUpdateCount *atomic.Uint32 // shared counter from parent discovery
3845
}
3946

4047
// TargetDiscovery polls GeolocationUser accounts and sends target/key updates
4148
// when changes are detected. It filters for activated, paid users whose targets
4249
// reference this probe's pubkey.
4350
type TargetDiscovery struct {
44-
log *slog.Logger
45-
geoProbePubkey solana.PublicKey
46-
client GeolocationUserClient
47-
cliTargets []ProbeAddress
48-
cliAllowedKeys [][32]byte
49-
interval time.Duration
50-
51-
cachedTargets []ProbeAddress
52-
cachedInboundKeys [][32]byte
53-
tickCount uint64
51+
log *slog.Logger
52+
geoProbePubkey solana.PublicKey
53+
client GeolocationUserClient
54+
cliTargets []ProbeAddress
55+
cliAllowedKeys [][32]byte
56+
interval time.Duration
57+
probeTargetUpdateCount *atomic.Uint32
58+
59+
cachedTargets []ProbeAddress
60+
cachedInboundKeys [][32]byte
61+
tickCount uint64
62+
lastSeenTargetUpdateCount uint32
5463
}
5564

5665
// NewTargetDiscovery creates a new TargetDiscovery instance.
@@ -69,12 +78,13 @@ func NewTargetDiscovery(cfg *TargetDiscoveryConfig) (*TargetDiscovery, error) {
6978
}
7079

7180
return &TargetDiscovery{
72-
log: cfg.Logger,
73-
geoProbePubkey: cfg.GeoProbePubkey,
74-
client: cfg.Client,
75-
cliTargets: cfg.CLITargets,
76-
cliAllowedKeys: cfg.CLIAllowedKeys,
77-
interval: cfg.Interval,
81+
log: cfg.Logger,
82+
geoProbePubkey: cfg.GeoProbePubkey,
83+
client: cfg.Client,
84+
cliTargets: cfg.CLITargets,
85+
cliAllowedKeys: cfg.CLIAllowedKeys,
86+
interval: cfg.Interval,
87+
probeTargetUpdateCount: cfg.ProbeTargetUpdateCount,
7888
}, nil
7989
}
8090

@@ -111,6 +121,11 @@ func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- T
111121
return
112122
}
113123

124+
// nil targets means the scan was skipped (target_update_count unchanged).
125+
if targets == nil && inboundKeys == nil {
126+
return
127+
}
128+
114129
if !probeAddressSlicesEqual(targets, d.cachedTargets) {
115130
d.cachedTargets = targets
116131
select {
@@ -131,10 +146,21 @@ func (d *TargetDiscovery) discoverAndSend(ctx context.Context, targetCh chan<- T
131146
}
132147

133148
// discover performs a single discovery cycle: fetch users, filter, extract targets/keys,
134-
// merge with CLI values.
149+
// merge with CLI values. Returns nil, nil, nil when the scan is skipped.
135150
func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, [][32]byte, error) {
151+
forceFullRefresh := d.tickCount%targetDiscoveryFullRefreshEvery == 0
136152
d.tickCount++
137153

154+
if d.probeTargetUpdateCount != nil && !forceFullRefresh {
155+
current := d.probeTargetUpdateCount.Load()
156+
if current == d.lastSeenTargetUpdateCount && d.tickCount > 1 {
157+
d.log.Debug("GeoProbe target_update_count unchanged, skipping target scan",
158+
"targetUpdateCount", current)
159+
return nil, nil, nil
160+
}
161+
d.lastSeenTargetUpdateCount = current
162+
}
163+
138164
users, err := d.client.GetGeolocationUsers(ctx)
139165
if err != nil {
140166
return nil, nil, fmt.Errorf("failed to fetch GeolocationUser accounts: %w", err)
@@ -195,6 +221,11 @@ func (d *TargetDiscovery) discover(ctx context.Context) ([]ProbeAddress, [][32]b
195221
mergedTargets := mergeProbes(d.cliTargets, onchainTargets)
196222
mergedKeys := mergeKeys(d.cliAllowedKeys, onchainKeys)
197223

224+
// Sync lastSeenTargetUpdateCount after a full scan (covers forced refresh path).
225+
if d.probeTargetUpdateCount != nil {
226+
d.lastSeenTargetUpdateCount = d.probeTargetUpdateCount.Load()
227+
}
228+
198229
d.log.Debug("Target discovery tick",
199230
"users", len(users),
200231
"onchainOutbound", len(onchainTargets),

0 commit comments

Comments
 (0)