Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ All notable changes to this project will be documented in this file.

- Client
- Get client IP from the daemon in the disconnect command, matching the connect command's behavior, to avoid IP mismatches behind NAT
- Onchain Programs
- Add `target_update_count` field to GeoProbe account, incremented on `AddTarget` and `RemoveTarget`; uses `BorshDeserializeIncremental` so existing accounts default to 0 (non-breaking)
- SDK
- Add `TargetUpdateCount` field to Go GeoProbe struct with backward-compatible deserialization
- Telemetry
- Skip expensive `GetGeolocationUsers` RPC scan in geoprobe-agent when the probe's `target_update_count` is unchanged, with a forced full refresh every ~5 minutes as safety net

## [v0.13.0](https://github.com/malbeclabs/doublezero/compare/client/v0.12.0...client/v0.13.0) - 2026-03-20

Expand Down
89 changes: 62 additions & 27 deletions controlplane/telemetry/cmd/geoprobe-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand All @@ -35,7 +36,6 @@ const (
defaultEvictionInterval = 30 * time.Minute
defaultVerifyInterval = 29 * time.Second
discoveryInterval = 60 * time.Second
parentDiscoveryInterval = 60 * time.Second
)

var (
Expand Down Expand Up @@ -533,27 +533,36 @@ func main() {
}
}()

// Run parent DZD discovery if program IDs are configured.
// Shared counter: parent discovery writes the GeoProbe target_update_count on each
// poll; target discovery reads it to skip expensive full scans when unchanged.
// Both discoveries run sequentially in a single goroutine to guarantee the
// counter is updated before target discovery reads it.
var probeTargetUpdateCount atomic.Uint32

targetUpdateCh := make(chan geoprobe.TargetUpdate, 1)
inboundKeyCh := make(chan geoprobe.InboundKeyUpdate, 1)
parentUpdateCh := make(chan geoprobe.ParentUpdate, 1)

// Build parent discovery if program IDs are configured.
var pd *geoprobe.ParentDiscovery
if parentDiscoveryEnabled {
parentUpdateCh := make(chan geoprobe.ParentUpdate, 1)
geoProbeClient := geoprobe.NewRPCGeoProbeClient(rpcClient, geolocationProgramID)
deviceResolver := geoprobe.NewRPCDeviceResolver(rpcClient, serviceabilityProgramID)

pd, err := geoprobe.NewParentDiscovery(&geoprobe.ParentDiscoveryConfig{
GeoProbePubkey: geoProbePubkey,
Client: geoProbeClient,
Resolver: deviceResolver,
CLIParents: cliParentAuthorities,
Interval: parentDiscoveryInterval,
Logger: log,
var pdErr error
pd, pdErr = geoprobe.NewParentDiscovery(&geoprobe.ParentDiscoveryConfig{
GeoProbePubkey: geoProbePubkey,
Client: geoProbeClient,
Resolver: deviceResolver,
CLIParents: cliParentAuthorities,
Logger: log,
ProbeTargetUpdateCount: &probeTargetUpdateCount,
})
if err != nil {
log.Error("Failed to create parent discovery", "error", err)
if pdErr != nil {
log.Error("Failed to create parent discovery", "error", pdErr)
os.Exit(1)
}

go pd.Run(ctx, parentUpdateCh)

// Consume parent updates: update parentState for OffsetListener validation.
// Parent authority keys are NOT added to the signed TWAMP reflector — parent
// DZDs use the unsigned reflector. The signed reflector's allowlist comes
Expand All @@ -572,26 +581,52 @@ func main() {
}()
}

// Run target discovery if geolocation program ID is configured.
targetUpdateCh := make(chan geoprobe.TargetUpdate, 1)
inboundKeyCh := make(chan geoprobe.InboundKeyUpdate, 1)
// Build target discovery if geolocation program ID is configured.
var td *geoprobe.TargetDiscovery
if !geolocationProgramID.IsZero() {
geolocationUserClient := geolocation.New(log, rpcClient, geolocationProgramID)
td, err := geoprobe.NewTargetDiscovery(&geoprobe.TargetDiscoveryConfig{
GeoProbePubkey: geoProbePubkey,
Client: geolocationUserClient,
CLITargets: targets,
CLIAllowedKeys: allowedKeys,
Interval: discoveryInterval,
Logger: log,
var tdErr error
td, tdErr = geoprobe.NewTargetDiscovery(&geoprobe.TargetDiscoveryConfig{
GeoProbePubkey: geoProbePubkey,
Client: geolocationUserClient,
CLITargets: targets,
CLIAllowedKeys: allowedKeys,
Logger: log,
ProbeTargetUpdateCount: &probeTargetUpdateCount,
})
if err != nil {
log.Error("Failed to create target discovery", "error", err)
if tdErr != nil {
log.Error("Failed to create target discovery", "error", tdErr)
os.Exit(1)
}
go td.Run(ctx, targetUpdateCh, inboundKeyCh)
}

// Run parent and target discovery sequentially in a single goroutine so that
// parent discovery always updates probeTargetUpdateCount before target
// discovery reads it.
go func() {
tick := func() {
if pd != nil {
pd.Tick(ctx, parentUpdateCh)
}
if td != nil {
td.Tick(ctx, targetUpdateCh, inboundKeyCh)
}
}

tick()

discoveryTicker := time.NewTicker(discoveryInterval)
defer discoveryTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-discoveryTicker.C:
tick()
}
}
}()

// Run main measurement loop. This runs regardless of whether trusted parents
// are configured at startup, since they may be added dynamically at runtime.
go func() {
Expand Down
71 changes: 26 additions & 45 deletions controlplane/telemetry/internal/geoprobe/onchain_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"time"
"sync/atomic"

"github.com/gagliardetto/solana-go"
solanarpc "github.com/gagliardetto/solana-go/rpc"
Expand Down Expand Up @@ -38,22 +38,22 @@ type ParentUpdate struct {

// ParentDiscoveryConfig holds configuration for parent discovery.
type ParentDiscoveryConfig struct {
GeoProbePubkey solana.PublicKey
Client GeoProbeAccountClient
Resolver DeviceResolver
CLIParents map[[32]byte][32]byte // static parents from --additional-parent
Interval time.Duration
Logger *slog.Logger
GeoProbePubkey solana.PublicKey
Client GeoProbeAccountClient
Resolver DeviceResolver
CLIParents map[[32]byte][32]byte // static parents from --additional-parent
Logger *slog.Logger
ProbeTargetUpdateCount *atomic.Uint32 // shared counter for target discovery change detection
}

// ParentDiscovery polls the GeoProbe account and resolves parent devices.
type ParentDiscovery struct {
log *slog.Logger
geoProbePubkey solana.PublicKey
client GeoProbeAccountClient
resolver DeviceResolver
cliParents map[[32]byte][32]byte
interval time.Duration
log *slog.Logger
geoProbePubkey solana.PublicKey
client GeoProbeAccountClient
resolver DeviceResolver
cliParents map[[32]byte][32]byte
probeTargetUpdateCount *atomic.Uint32

cachedParentDevices []solana.PublicKey
tickCount uint64
Expand All @@ -73,48 +73,24 @@ func NewParentDiscovery(cfg *ParentDiscoveryConfig) (*ParentDiscovery, error) {
if cfg.GeoProbePubkey.IsZero() {
return nil, fmt.Errorf("geoprobe pubkey is required")
}
if cfg.Interval <= 0 {
return nil, fmt.Errorf("interval must be greater than 0")
}

cliParents := cfg.CLIParents
if cliParents == nil {
cliParents = make(map[[32]byte][32]byte)
}

return &ParentDiscovery{
log: cfg.Logger,
geoProbePubkey: cfg.GeoProbePubkey,
client: cfg.Client,
resolver: cfg.Resolver,
cliParents: cliParents,
interval: cfg.Interval,
log: cfg.Logger,
geoProbePubkey: cfg.GeoProbePubkey,
client: cfg.Client,
resolver: cfg.Resolver,
cliParents: cliParents,
probeTargetUpdateCount: cfg.ProbeTargetUpdateCount,
}, nil
}

// Run starts the discovery polling loop, sending ParentUpdates to the channel.
// It performs an immediate discovery tick, then repeats at the configured interval.
func (d *ParentDiscovery) Run(ctx context.Context, ch chan<- ParentUpdate) {
d.log.Info("Starting parent DZD discovery",
"interval", d.interval,
"geoProbePubkey", d.geoProbePubkey,
"cliParents", len(d.cliParents),
)

// Tick performs a single parent discovery cycle and sends updates to the channel.
func (d *ParentDiscovery) Tick(ctx context.Context, ch chan<- ParentUpdate) {
d.discoverAndSend(ctx, ch)

ticker := time.NewTicker(d.interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
d.log.Info("Parent DZD discovery shutting down")
return
case <-ticker.C:
d.discoverAndSend(ctx, ch)
}
}
}

func (d *ParentDiscovery) discoverAndSend(ctx context.Context, ch chan<- ParentUpdate) {
Expand Down Expand Up @@ -152,6 +128,11 @@ func (d *ParentDiscovery) discover(ctx context.Context) (*ParentUpdate, error) {
return d.cliOnlyUpdate(), nil
}

// Publish the probe's target_update_count for target discovery change detection.
if d.probeTargetUpdateCount != nil {
d.probeTargetUpdateCount.Store(probe.TargetUpdateCount)
}

// Check if parent device set changed since last poll.
if !forceFullRefresh && pubkeySlicesEqual(d.cachedParentDevices, probe.ParentDevices) {
d.log.Debug("Parent device set unchanged, skipping resolution",
Expand Down
Loading
Loading