Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ All notable changes to this project will be documented in this file.
- Controller
- Retry transient Solana RPC failures when fetching onchain serviceability accounts so controller polls are more resilient to short-lived provider resets

- Telemetry
- Add timestamp index companion account for device and internet latency samples, enabling reliable timestamp reconstruction when agents experience downtime gaps within an epoch

## [v0.12.0](https://github.com/malbeclabs/doublezero/compare/client/v0.11.0...client/v0.12.0) - 2026-03-16

### Breaking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ type ServiceabilityProgramClient interface {
}

type TelemetryProgramClient interface {
ProgramID() solana.PublicKey
InitializeInternetLatencySamples(ctx context.Context, config telemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)
InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error)
WriteInternetLatencySamples(ctx context.Context, config telemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ type mockTelemetryProgramClient struct {
InitializeInternetLatencySamplesFunc func(ctx context.Context, config telemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)
WriteInternetLatencySamplesFunc func(ctx context.Context, config telemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)
GetInternetLatencySamplesFunc func(ctx context.Context, dataProviderName string, originExchangePK solana.PublicKey, targetExchangePK solana.PublicKey, epoch uint64) (*telemetry.InternetLatencySamples, error)
InitializeTimestampIndexFunc func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error)
}

func (c *mockTelemetryProgramClient) ProgramID() solana.PublicKey {
return solana.MustPublicKeyFromBase58("11111111111111111111111111111111")
}

func (c *mockTelemetryProgramClient) InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) {
if c.InitializeTimestampIndexFunc != nil {
return c.InitializeTimestampIndexFunc(ctx, samplesAccountPK)
}
return solana.Signature{}, nil, nil
}

func (c *mockTelemetryProgramClient) InitializeInternetLatencySamples(ctx context.Context, config telemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,56 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey
}
}

// Derive the samples PDA so we can derive the timestamp index PDA from it.
samplesPDA, _, err := telemetry.DeriveInternetLatencySamplesPDA(
s.cfg.Telemetry.ProgramID(),
s.cfg.OracleAgentPK,
string(partitionKey.DataProvider),
partitionKey.SourceExchangePK,
partitionKey.TargetExchangePK,
partitionKey.Epoch,
)
if err != nil {
return fmt.Errorf("failed to derive internet latency samples PDA: %w", err)
}
timestampIndexPDA, _, err := telemetry.DeriveTimestampIndexPDA(
s.cfg.Telemetry.ProgramID(),
samplesPDA,
)
if err != nil {
return fmt.Errorf("failed to derive timestamp index PDA: %w", err)
}

writeConfig := telemetry.WriteInternetLatencySamplesInstructionConfig{
DataProviderName: string(partitionKey.DataProvider),
OriginExchangePK: partitionKey.SourceExchangePK,
TargetExchangePK: partitionKey.TargetExchangePK,
Epoch: partitionKey.Epoch,
StartTimestampMicroseconds: uint64(minTimestamp.UnixMicro()),
Samples: rtts,
TimestampIndexPK: &timestampIndexPDA,
}

_, _, err := s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig)
_, _, err = s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig)
if err != nil {
if errors.Is(err, telemetry.ErrAccountNotFound) {
if errors.Is(err, telemetry.ErrTimestampIndexNotFound) {
log.Info("Timestamp index account not found, initializing")
_, _, err = s.cfg.Telemetry.InitializeTimestampIndex(ctx, samplesPDA)
if err != nil {
log.Warn("Failed to initialize timestamp index, writes will proceed without it", "error", err)
writeConfig.TimestampIndexPK = nil
}
_, _, err = s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig)
if err != nil {
if errors.Is(err, telemetry.ErrSamplesAccountFull) {
log.Warn("Partition account is full, dropping samples from buffer and moving on", "droppedSamples", len(samples))
metrics.ExporterSubmitterAccountFull.WithLabelValues(string(partitionKey.DataProvider), partitionKey.SourceExchangePK.String(), partitionKey.TargetExchangePK.String(), strconv.FormatUint(partitionKey.Epoch, 10)).Inc()
s.cfg.Buffer.Remove(partitionKey)
return nil
}
return fmt.Errorf("failed to write internet latency samples after timestamp index init: %w", err)
}
} else if errors.Is(err, telemetry.ErrAccountNotFound) {
log.Info("Account not found, initializing new account")
samplingInterval, ok := s.cfg.DataProviderSamplingIntervals[partitionKey.DataProvider]
if !ok {
Expand All @@ -144,6 +182,12 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey
if err != nil {
return fmt.Errorf("failed to initialize internet latency samples: %w", err)
}
// Initialize the companion timestamp index account.
_, _, err = s.cfg.Telemetry.InitializeTimestampIndex(ctx, samplesPDA)
if err != nil {
log.Warn("Failed to initialize timestamp index, writes will proceed without it", "error", err)
writeConfig.TimestampIndexPK = nil
}
_, _, err = s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig)
if err != nil {
if errors.Is(err, telemetry.ErrSamplesAccountFull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,156 @@ func TestInternetLatency_Submitter(t *testing.T) {
assert.False(t, buffer.Has(key), "partition key should be removed after account full error")
})

t.Run("initializes_only_timestamp_index_when_timestamp_index_not_found", func(t *testing.T) {
t.Parallel()

log := logger.With("test", t.Name())

key := newTestPartitionKey()
sample := newTestSample()

var initSamplesCalled, initTimestampIndexCalled, writeCalled int32
telemetryProgram := &mockTelemetryProgramClient{
WriteInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
if atomic.AddInt32(&writeCalled, 1) == 1 {
return solana.Signature{}, nil, sdktelemetry.ErrTimestampIndexNotFound
}
return solana.Signature{}, nil, nil
},
InitializeInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
atomic.AddInt32(&initSamplesCalled, 1)
return solana.Signature{}, nil, nil
},
InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) {
atomic.AddInt32(&initTimestampIndexCalled, 1)
return solana.Signature{}, nil, nil
},
}

buffer := buffer.NewMemoryPartitionedBuffer[exporter.PartitionKey, exporter.Sample](128)
buffer.Add(key, sample)

submitter, err := exporter.NewSubmitter(log, &exporter.SubmitterConfig{
OracleAgentPK: solana.NewWallet().PublicKey(),
Interval: time.Hour,
Buffer: buffer,
Telemetry: telemetryProgram,
MaxAttempts: 3,
BackoffFunc: func(_ int) time.Duration { return 0 },
EpochFinder: &mockEpochFinder{ApproximateAtTimeFunc: func(ctx context.Context, target time.Time) (uint64, error) {
return key.Epoch, nil
}},
DataProviderSamplingIntervals: map[exporter.DataProviderName]time.Duration{
key.DataProvider: time.Second,
},
})
require.NoError(t, err)

submitter.Tick(t.Context())

assert.Equal(t, int32(0), atomic.LoadInt32(&initSamplesCalled), "should not initialize samples account")
assert.Equal(t, int32(1), atomic.LoadInt32(&initTimestampIndexCalled), "should initialize timestamp index")
assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should try write twice (before and after timestamp index init)")
})

t.Run("clears_timestamp_index_pk_when_init_fails", func(t *testing.T) {
t.Parallel()

log := logger.With("test", t.Name())

key := newTestPartitionKey()
sample := newTestSample()

var writeCalled int32
var retryTimestampIndexPK *solana.PublicKey
telemetryProgram := &mockTelemetryProgramClient{
WriteInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
n := atomic.AddInt32(&writeCalled, 1)
if n == 1 {
return solana.Signature{}, nil, sdktelemetry.ErrTimestampIndexNotFound
}
retryTimestampIndexPK = config.TimestampIndexPK
return solana.Signature{}, nil, nil
},
InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) {
return solana.Signature{}, nil, errors.New("init failed")
},
}

buffer := buffer.NewMemoryPartitionedBuffer[exporter.PartitionKey, exporter.Sample](128)
buffer.Add(key, sample)

submitter, err := exporter.NewSubmitter(log, &exporter.SubmitterConfig{
OracleAgentPK: solana.NewWallet().PublicKey(),
Interval: time.Hour,
Buffer: buffer,
Telemetry: telemetryProgram,
MaxAttempts: 2,
BackoffFunc: func(_ int) time.Duration { return 0 },
EpochFinder: &mockEpochFinder{ApproximateAtTimeFunc: func(ctx context.Context, target time.Time) (uint64, error) {
return key.Epoch, nil
}},
})
require.NoError(t, err)

submitter.Tick(t.Context())

assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should retry write after failed timestamp index init")
assert.Nil(t, retryTimestampIndexPK, "retry write should have nil TimestampIndexPK after failed init")
})

t.Run("clears_timestamp_index_pk_when_init_fails_on_new_account", func(t *testing.T) {
t.Parallel()

log := logger.With("test", t.Name())

key := newTestPartitionKey()
sample := newTestSample()

var writeCalled int32
var retryTimestampIndexPK *solana.PublicKey
telemetryProgram := &mockTelemetryProgramClient{
WriteInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
n := atomic.AddInt32(&writeCalled, 1)
if n == 1 {
return solana.Signature{}, nil, sdktelemetry.ErrAccountNotFound
}
retryTimestampIndexPK = config.TimestampIndexPK
return solana.Signature{}, nil, nil
},
InitializeInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
return solana.Signature{}, nil, nil
},
InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) {
return solana.Signature{}, nil, errors.New("init failed")
},
}

buffer := buffer.NewMemoryPartitionedBuffer[exporter.PartitionKey, exporter.Sample](128)
buffer.Add(key, sample)

submitter, err := exporter.NewSubmitter(log, &exporter.SubmitterConfig{
OracleAgentPK: solana.NewWallet().PublicKey(),
Interval: time.Hour,
Buffer: buffer,
Telemetry: telemetryProgram,
MaxAttempts: 2,
BackoffFunc: func(_ int) time.Duration { return 0 },
EpochFinder: &mockEpochFinder{ApproximateAtTimeFunc: func(ctx context.Context, target time.Time) (uint64, error) {
return key.Epoch, nil
}},
DataProviderSamplingIntervals: map[exporter.DataProviderName]time.Duration{
key.DataProvider: time.Second,
},
})
require.NoError(t, err)

submitter.Tick(t.Context())

assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should retry write after failed timestamp index init")
assert.Nil(t, retryTimestampIndexPK, "retry write should have nil TimestampIndexPK after failed init")
})

t.Run("failed_retries_reinsert_at_front_preserving_order", func(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 8 additions & 0 deletions controlplane/telemetry/internal/telemetry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func (m *mockPeerDiscovery) GetPeers() []*Peer {

type mockTelemetryProgramClient struct{}

func (m *mockTelemetryProgramClient) ProgramID() solana.PublicKey {
return solana.MustPublicKeyFromBase58("11111111111111111111111111111111")
}

func (m *mockTelemetryProgramClient) InitializeTimestampIndex(_ context.Context, _ solana.PublicKey) (solana.Signature, *rpc.GetTransactionResult, error) {
return solana.Signature{}, nil, nil
}

func (m *mockTelemetryProgramClient) InitializeDeviceLatencySamples(ctx context.Context, config telemetryprog.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *rpc.GetTransactionResult, error) {
return solana.Signature{}, nil, nil
}
Expand Down
6 changes: 6 additions & 0 deletions controlplane/telemetry/internal/telemetry/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ type ServiceabilityProgramClient interface {

// TelemetryProgramClient is the client to the telemetry program.
type TelemetryProgramClient interface {
// ProgramID returns the telemetry program ID.
ProgramID() solana.PublicKey

// InitializeDeviceLatencySamples initializes the device latency samples account.
InitializeDeviceLatencySamples(ctx context.Context, config telemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)

// InitializeTimestampIndex initializes a timestamp index companion account.
InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error)

// WriteDeviceLatencySamples writes the device latency samples to the account.
WriteDeviceLatencySamples(ctx context.Context, config telemetry.WriteDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)
}
20 changes: 20 additions & 0 deletions controlplane/telemetry/internal/telemetry/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ type mockTelemetryProgramClient struct {
InitializeDeviceLatencySamplesFunc func(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)
WriteDeviceLatencySamplesFunc func(ctx context.Context, config sdktelemetry.WriteDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error)
GetDeviceLatencySamplesFunc func(ctx context.Context, originDevicePK solana.PublicKey, targetDevicePK solana.PublicKey, linkPK solana.PublicKey, epoch uint64) (*sdktelemetry.DeviceLatencySamples, error)
InitializeTimestampIndexFunc func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error)
}

func (c *mockTelemetryProgramClient) ProgramID() solana.PublicKey {
return solana.MustPublicKeyFromBase58("11111111111111111111111111111111")
}

func (c *mockTelemetryProgramClient) InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) {
if c.InitializeTimestampIndexFunc != nil {
return c.InitializeTimestampIndexFunc(ctx, samplesAccountPK)
}
return solana.Signature{}, nil, nil
}

func (c *mockTelemetryProgramClient) InitializeDeviceLatencySamples(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
Expand All @@ -103,6 +115,14 @@ func newMemoryTelemetryProgramClient() *memoryTelemetryProgramClient {
}
}

func (c *memoryTelemetryProgramClient) ProgramID() solana.PublicKey {
return solana.MustPublicKeyFromBase58("11111111111111111111111111111111")
}

func (c *memoryTelemetryProgramClient) InitializeTimestampIndex(_ context.Context, _ solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) {
return solana.Signature{}, nil, nil
}

func (c *memoryTelemetryProgramClient) InitializeDeviceLatencySamples(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
Loading
Loading