diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f03267d4a..58f1fff967 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,9 @@ All notable changes to this project will be documented in this file. - Controller - Retry transient Solana RPC failures when fetching onchain serviceability accounts so controller polls are more resilient to short-lived provider resets +- Telemetry + - Add timestamp index companion account for device and internet latency samples, enabling reliable timestamp reconstruction when agents experience downtime gaps within an epoch + ## [v0.12.0](https://github.com/malbeclabs/doublezero/compare/client/v0.11.0...client/v0.12.0) - 2026-03-16 ### Breaking diff --git a/controlplane/internet-latency-collector/internal/exporter/ledger.go b/controlplane/internet-latency-collector/internal/exporter/ledger.go index 2ebe6d90fb..eb55e50783 100644 --- a/controlplane/internet-latency-collector/internal/exporter/ledger.go +++ b/controlplane/internet-latency-collector/internal/exporter/ledger.go @@ -37,7 +37,9 @@ type ServiceabilityProgramClient interface { } type TelemetryProgramClient interface { + ProgramID() solana.PublicKey InitializeInternetLatencySamples(ctx context.Context, config telemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) + InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) WriteInternetLatencySamples(ctx context.Context, config telemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) } diff --git a/controlplane/internet-latency-collector/internal/exporter/main_test.go b/controlplane/internet-latency-collector/internal/exporter/main_test.go index 6ffb17ac97..0e6a27b25f 100644 --- a/controlplane/internet-latency-collector/internal/exporter/main_test.go +++ b/controlplane/internet-latency-collector/internal/exporter/main_test.go @@ -52,6 +52,18 @@ type mockTelemetryProgramClient struct { InitializeInternetLatencySamplesFunc func(ctx context.Context, config telemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) WriteInternetLatencySamplesFunc func(ctx context.Context, config telemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) GetInternetLatencySamplesFunc func(ctx context.Context, dataProviderName string, originExchangePK solana.PublicKey, targetExchangePK solana.PublicKey, epoch uint64) (*telemetry.InternetLatencySamples, error) + InitializeTimestampIndexFunc func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) +} + +func (c *mockTelemetryProgramClient) ProgramID() solana.PublicKey { + return solana.MustPublicKeyFromBase58("11111111111111111111111111111111") +} + +func (c *mockTelemetryProgramClient) InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + if c.InitializeTimestampIndexFunc != nil { + return c.InitializeTimestampIndexFunc(ctx, samplesAccountPK) + } + return solana.Signature{}, nil, nil } func (c *mockTelemetryProgramClient) InitializeInternetLatencySamples(ctx context.Context, config telemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { diff --git a/controlplane/internet-latency-collector/internal/exporter/submitter.go b/controlplane/internet-latency-collector/internal/exporter/submitter.go index 9f01d09398..6708efa473 100644 --- a/controlplane/internet-latency-collector/internal/exporter/submitter.go +++ b/controlplane/internet-latency-collector/internal/exporter/submitter.go @@ -117,6 +117,26 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey } } + // Derive the samples PDA so we can derive the timestamp index PDA from it. + samplesPDA, _, err := telemetry.DeriveInternetLatencySamplesPDA( + s.cfg.Telemetry.ProgramID(), + s.cfg.OracleAgentPK, + string(partitionKey.DataProvider), + partitionKey.SourceExchangePK, + partitionKey.TargetExchangePK, + partitionKey.Epoch, + ) + if err != nil { + return fmt.Errorf("failed to derive internet latency samples PDA: %w", err) + } + timestampIndexPDA, _, err := telemetry.DeriveTimestampIndexPDA( + s.cfg.Telemetry.ProgramID(), + samplesPDA, + ) + if err != nil { + return fmt.Errorf("failed to derive timestamp index PDA: %w", err) + } + writeConfig := telemetry.WriteInternetLatencySamplesInstructionConfig{ DataProviderName: string(partitionKey.DataProvider), OriginExchangePK: partitionKey.SourceExchangePK, @@ -124,11 +144,29 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey Epoch: partitionKey.Epoch, StartTimestampMicroseconds: uint64(minTimestamp.UnixMicro()), Samples: rtts, + TimestampIndexPK: ×tampIndexPDA, } - _, _, err := s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig) + _, _, err = s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig) if err != nil { - if errors.Is(err, telemetry.ErrAccountNotFound) { + if errors.Is(err, telemetry.ErrTimestampIndexNotFound) { + log.Info("Timestamp index account not found, initializing") + _, _, err = s.cfg.Telemetry.InitializeTimestampIndex(ctx, samplesPDA) + if err != nil { + log.Warn("Failed to initialize timestamp index, writes will proceed without it", "error", err) + writeConfig.TimestampIndexPK = nil + } + _, _, err = s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig) + if err != nil { + if errors.Is(err, telemetry.ErrSamplesAccountFull) { + log.Warn("Partition account is full, dropping samples from buffer and moving on", "droppedSamples", len(samples)) + metrics.ExporterSubmitterAccountFull.WithLabelValues(string(partitionKey.DataProvider), partitionKey.SourceExchangePK.String(), partitionKey.TargetExchangePK.String(), strconv.FormatUint(partitionKey.Epoch, 10)).Inc() + s.cfg.Buffer.Remove(partitionKey) + return nil + } + return fmt.Errorf("failed to write internet latency samples after timestamp index init: %w", err) + } + } else if errors.Is(err, telemetry.ErrAccountNotFound) { log.Info("Account not found, initializing new account") samplingInterval, ok := s.cfg.DataProviderSamplingIntervals[partitionKey.DataProvider] if !ok { @@ -144,6 +182,12 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey if err != nil { return fmt.Errorf("failed to initialize internet latency samples: %w", err) } + // Initialize the companion timestamp index account. + _, _, err = s.cfg.Telemetry.InitializeTimestampIndex(ctx, samplesPDA) + if err != nil { + log.Warn("Failed to initialize timestamp index, writes will proceed without it", "error", err) + writeConfig.TimestampIndexPK = nil + } _, _, err = s.cfg.Telemetry.WriteInternetLatencySamples(ctx, writeConfig) if err != nil { if errors.Is(err, telemetry.ErrSamplesAccountFull) { diff --git a/controlplane/internet-latency-collector/internal/exporter/submitter_test.go b/controlplane/internet-latency-collector/internal/exporter/submitter_test.go index ed42c998c1..df25dd1a60 100644 --- a/controlplane/internet-latency-collector/internal/exporter/submitter_test.go +++ b/controlplane/internet-latency-collector/internal/exporter/submitter_test.go @@ -571,6 +571,156 @@ func TestInternetLatency_Submitter(t *testing.T) { assert.False(t, buffer.Has(key), "partition key should be removed after account full error") }) + t.Run("initializes_only_timestamp_index_when_timestamp_index_not_found", func(t *testing.T) { + t.Parallel() + + log := logger.With("test", t.Name()) + + key := newTestPartitionKey() + sample := newTestSample() + + var initSamplesCalled, initTimestampIndexCalled, writeCalled int32 + telemetryProgram := &mockTelemetryProgramClient{ + WriteInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + if atomic.AddInt32(&writeCalled, 1) == 1 { + return solana.Signature{}, nil, sdktelemetry.ErrTimestampIndexNotFound + } + return solana.Signature{}, nil, nil + }, + InitializeInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + atomic.AddInt32(&initSamplesCalled, 1) + return solana.Signature{}, nil, nil + }, + InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + atomic.AddInt32(&initTimestampIndexCalled, 1) + return solana.Signature{}, nil, nil + }, + } + + buffer := buffer.NewMemoryPartitionedBuffer[exporter.PartitionKey, exporter.Sample](128) + buffer.Add(key, sample) + + submitter, err := exporter.NewSubmitter(log, &exporter.SubmitterConfig{ + OracleAgentPK: solana.NewWallet().PublicKey(), + Interval: time.Hour, + Buffer: buffer, + Telemetry: telemetryProgram, + MaxAttempts: 3, + BackoffFunc: func(_ int) time.Duration { return 0 }, + EpochFinder: &mockEpochFinder{ApproximateAtTimeFunc: func(ctx context.Context, target time.Time) (uint64, error) { + return key.Epoch, nil + }}, + DataProviderSamplingIntervals: map[exporter.DataProviderName]time.Duration{ + key.DataProvider: time.Second, + }, + }) + require.NoError(t, err) + + submitter.Tick(t.Context()) + + assert.Equal(t, int32(0), atomic.LoadInt32(&initSamplesCalled), "should not initialize samples account") + assert.Equal(t, int32(1), atomic.LoadInt32(&initTimestampIndexCalled), "should initialize timestamp index") + assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should try write twice (before and after timestamp index init)") + }) + + t.Run("clears_timestamp_index_pk_when_init_fails", func(t *testing.T) { + t.Parallel() + + log := logger.With("test", t.Name()) + + key := newTestPartitionKey() + sample := newTestSample() + + var writeCalled int32 + var retryTimestampIndexPK *solana.PublicKey + telemetryProgram := &mockTelemetryProgramClient{ + WriteInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + n := atomic.AddInt32(&writeCalled, 1) + if n == 1 { + return solana.Signature{}, nil, sdktelemetry.ErrTimestampIndexNotFound + } + retryTimestampIndexPK = config.TimestampIndexPK + return solana.Signature{}, nil, nil + }, + InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + return solana.Signature{}, nil, errors.New("init failed") + }, + } + + buffer := buffer.NewMemoryPartitionedBuffer[exporter.PartitionKey, exporter.Sample](128) + buffer.Add(key, sample) + + submitter, err := exporter.NewSubmitter(log, &exporter.SubmitterConfig{ + OracleAgentPK: solana.NewWallet().PublicKey(), + Interval: time.Hour, + Buffer: buffer, + Telemetry: telemetryProgram, + MaxAttempts: 2, + BackoffFunc: func(_ int) time.Duration { return 0 }, + EpochFinder: &mockEpochFinder{ApproximateAtTimeFunc: func(ctx context.Context, target time.Time) (uint64, error) { + return key.Epoch, nil + }}, + }) + require.NoError(t, err) + + submitter.Tick(t.Context()) + + assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should retry write after failed timestamp index init") + assert.Nil(t, retryTimestampIndexPK, "retry write should have nil TimestampIndexPK after failed init") + }) + + t.Run("clears_timestamp_index_pk_when_init_fails_on_new_account", func(t *testing.T) { + t.Parallel() + + log := logger.With("test", t.Name()) + + key := newTestPartitionKey() + sample := newTestSample() + + var writeCalled int32 + var retryTimestampIndexPK *solana.PublicKey + telemetryProgram := &mockTelemetryProgramClient{ + WriteInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + n := atomic.AddInt32(&writeCalled, 1) + if n == 1 { + return solana.Signature{}, nil, sdktelemetry.ErrAccountNotFound + } + retryTimestampIndexPK = config.TimestampIndexPK + return solana.Signature{}, nil, nil + }, + InitializeInternetLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.InitializeInternetLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + return solana.Signature{}, nil, nil + }, + InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + return solana.Signature{}, nil, errors.New("init failed") + }, + } + + buffer := buffer.NewMemoryPartitionedBuffer[exporter.PartitionKey, exporter.Sample](128) + buffer.Add(key, sample) + + submitter, err := exporter.NewSubmitter(log, &exporter.SubmitterConfig{ + OracleAgentPK: solana.NewWallet().PublicKey(), + Interval: time.Hour, + Buffer: buffer, + Telemetry: telemetryProgram, + MaxAttempts: 2, + BackoffFunc: func(_ int) time.Duration { return 0 }, + EpochFinder: &mockEpochFinder{ApproximateAtTimeFunc: func(ctx context.Context, target time.Time) (uint64, error) { + return key.Epoch, nil + }}, + DataProviderSamplingIntervals: map[exporter.DataProviderName]time.Duration{ + key.DataProvider: time.Second, + }, + }) + require.NoError(t, err) + + submitter.Tick(t.Context()) + + assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should retry write after failed timestamp index init") + assert.Nil(t, retryTimestampIndexPK, "retry write should have nil TimestampIndexPK after failed init") + }) + t.Run("failed_retries_reinsert_at_front_preserving_order", func(t *testing.T) { t.Parallel() diff --git a/controlplane/telemetry/internal/telemetry/config_test.go b/controlplane/telemetry/internal/telemetry/config_test.go index 80e872d0e0..18be0a19ef 100644 --- a/controlplane/telemetry/internal/telemetry/config_test.go +++ b/controlplane/telemetry/internal/telemetry/config_test.go @@ -41,6 +41,14 @@ func (m *mockPeerDiscovery) GetPeers() []*Peer { type mockTelemetryProgramClient struct{} +func (m *mockTelemetryProgramClient) ProgramID() solana.PublicKey { + return solana.MustPublicKeyFromBase58("11111111111111111111111111111111") +} + +func (m *mockTelemetryProgramClient) InitializeTimestampIndex(_ context.Context, _ solana.PublicKey) (solana.Signature, *rpc.GetTransactionResult, error) { + return solana.Signature{}, nil, nil +} + func (m *mockTelemetryProgramClient) InitializeDeviceLatencySamples(ctx context.Context, config telemetryprog.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *rpc.GetTransactionResult, error) { return solana.Signature{}, nil, nil } diff --git a/controlplane/telemetry/internal/telemetry/ledger.go b/controlplane/telemetry/internal/telemetry/ledger.go index d8e047cbce..0c37b77f46 100644 --- a/controlplane/telemetry/internal/telemetry/ledger.go +++ b/controlplane/telemetry/internal/telemetry/ledger.go @@ -20,9 +20,15 @@ type ServiceabilityProgramClient interface { // TelemetryProgramClient is the client to the telemetry program. type TelemetryProgramClient interface { + // ProgramID returns the telemetry program ID. + ProgramID() solana.PublicKey + // InitializeDeviceLatencySamples initializes the device latency samples account. InitializeDeviceLatencySamples(ctx context.Context, config telemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) + // InitializeTimestampIndex initializes a timestamp index companion account. + InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) + // WriteDeviceLatencySamples writes the device latency samples to the account. WriteDeviceLatencySamples(ctx context.Context, config telemetry.WriteDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) } diff --git a/controlplane/telemetry/internal/telemetry/main_test.go b/controlplane/telemetry/internal/telemetry/main_test.go index 404aa59915..47a8f503e8 100644 --- a/controlplane/telemetry/internal/telemetry/main_test.go +++ b/controlplane/telemetry/internal/telemetry/main_test.go @@ -77,6 +77,18 @@ type mockTelemetryProgramClient struct { InitializeDeviceLatencySamplesFunc func(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) WriteDeviceLatencySamplesFunc func(ctx context.Context, config sdktelemetry.WriteDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) GetDeviceLatencySamplesFunc func(ctx context.Context, originDevicePK solana.PublicKey, targetDevicePK solana.PublicKey, linkPK solana.PublicKey, epoch uint64) (*sdktelemetry.DeviceLatencySamples, error) + InitializeTimestampIndexFunc func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) +} + +func (c *mockTelemetryProgramClient) ProgramID() solana.PublicKey { + return solana.MustPublicKeyFromBase58("11111111111111111111111111111111") +} + +func (c *mockTelemetryProgramClient) InitializeTimestampIndex(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + if c.InitializeTimestampIndexFunc != nil { + return c.InitializeTimestampIndexFunc(ctx, samplesAccountPK) + } + return solana.Signature{}, nil, nil } func (c *mockTelemetryProgramClient) InitializeDeviceLatencySamples(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { @@ -103,6 +115,14 @@ func newMemoryTelemetryProgramClient() *memoryTelemetryProgramClient { } } +func (c *memoryTelemetryProgramClient) ProgramID() solana.PublicKey { + return solana.MustPublicKeyFromBase58("11111111111111111111111111111111") +} + +func (c *memoryTelemetryProgramClient) InitializeTimestampIndex(_ context.Context, _ solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + return solana.Signature{}, nil, nil +} + func (c *memoryTelemetryProgramClient) InitializeDeviceLatencySamples(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/controlplane/telemetry/internal/telemetry/submitter.go b/controlplane/telemetry/internal/telemetry/submitter.go index f5bbb4c047..0f3c4356b2 100644 --- a/controlplane/telemetry/internal/telemetry/submitter.go +++ b/controlplane/telemetry/internal/telemetry/submitter.go @@ -114,6 +114,25 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey } } + // Derive the samples PDA so we can derive the timestamp index PDA from it. + samplesPDA, _, err := telemetry.DeriveDeviceLatencySamplesPDA( + s.cfg.ProgramClient.ProgramID(), + partitionKey.OriginDevicePK, + partitionKey.TargetDevicePK, + partitionKey.LinkPK, + partitionKey.Epoch, + ) + if err != nil { + return fmt.Errorf("failed to derive device latency samples PDA: %w", err) + } + timestampIndexPDA, _, err := telemetry.DeriveTimestampIndexPDA( + s.cfg.ProgramClient.ProgramID(), + samplesPDA, + ) + if err != nil { + return fmt.Errorf("failed to derive timestamp index PDA: %w", err) + } + writeConfig := telemetry.WriteDeviceLatencySamplesInstructionConfig{ AgentPK: s.cfg.MetricsPublisherPK, OriginDevicePK: partitionKey.OriginDevicePK, @@ -122,11 +141,29 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey Epoch: &partitionKey.Epoch, StartTimestampMicroseconds: uint64(minTimestamp.UnixMicro()), Samples: rtts, + TimestampIndexPK: ×tampIndexPDA, } - _, _, err := s.cfg.ProgramClient.WriteDeviceLatencySamples(ctx, writeConfig) + _, _, err = s.cfg.ProgramClient.WriteDeviceLatencySamples(ctx, writeConfig) if err != nil { - if errors.Is(err, telemetry.ErrAccountNotFound) { + if errors.Is(err, telemetry.ErrTimestampIndexNotFound) { + log.Info("Timestamp index account not found, initializing") + _, _, err = s.cfg.ProgramClient.InitializeTimestampIndex(ctx, samplesPDA) + if err != nil { + log.Warn("Failed to initialize timestamp index, writes will proceed without it", "error", err) + writeConfig.TimestampIndexPK = nil + } + _, _, err = s.cfg.ProgramClient.WriteDeviceLatencySamples(ctx, writeConfig) + if err != nil { + if errors.Is(err, telemetry.ErrSamplesAccountFull) { + log.Warn("Partition account is full, dropping samples from buffer and moving on", "droppedSamples", len(samples)) + s.cfg.Buffer.Remove(partitionKey) + return nil + } + metrics.Errors.WithLabelValues(metrics.ErrorTypeSubmitterFailedToWriteSamples).Inc() + return fmt.Errorf("failed to write device latency samples after timestamp index init: %w", err) + } + } else if errors.Is(err, telemetry.ErrAccountNotFound) { log.Info("Account not found, initializing new account") _, _, err = s.cfg.ProgramClient.InitializeDeviceLatencySamples(ctx, telemetry.InitializeDeviceLatencySamplesInstructionConfig{ AgentPK: s.cfg.MetricsPublisherPK, @@ -140,6 +177,12 @@ func (s *Submitter) SubmitSamples(ctx context.Context, partitionKey PartitionKey metrics.Errors.WithLabelValues(metrics.ErrorTypeSubmitterFailedToInitializeAccount).Inc() return fmt.Errorf("failed to initialize device latency samples: %w", err) } + // Initialize the companion timestamp index account. + _, _, err = s.cfg.ProgramClient.InitializeTimestampIndex(ctx, samplesPDA) + if err != nil { + log.Warn("Failed to initialize timestamp index, writes will proceed without it", "error", err) + writeConfig.TimestampIndexPK = nil + } _, _, err = s.cfg.ProgramClient.WriteDeviceLatencySamples(ctx, writeConfig) if err != nil { if errors.Is(err, telemetry.ErrSamplesAccountFull) { diff --git a/controlplane/telemetry/internal/telemetry/submitter_test.go b/controlplane/telemetry/internal/telemetry/submitter_test.go index eab22a1600..ad23c2cd79 100644 --- a/controlplane/telemetry/internal/telemetry/submitter_test.go +++ b/controlplane/telemetry/internal/telemetry/submitter_test.go @@ -696,6 +696,154 @@ func TestAgentTelemetry_Submitter(t *testing.T) { assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should try write twice (before and after init)") }) + t.Run("initializes_only_timestamp_index_when_timestamp_index_not_found", func(t *testing.T) { + t.Parallel() + + log := log.With("test", t.Name()) + + key := newTestPartitionKey() + sample := telemetry.Sample{ + Timestamp: time.Now(), + RTT: 40 * time.Microsecond, + Loss: false, + } + + var initSamplesCalled, initTimestampIndexCalled, writeCalled int32 + telemetryProgram := &mockTelemetryProgramClient{ + WriteDeviceLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + if atomic.AddInt32(&writeCalled, 1) == 1 { + return solana.Signature{}, nil, sdktelemetry.ErrTimestampIndexNotFound + } + return solana.Signature{}, nil, nil + }, + InitializeDeviceLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + atomic.AddInt32(&initSamplesCalled, 1) + return solana.Signature{}, nil, nil + }, + InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + atomic.AddInt32(&initTimestampIndexCalled, 1) + return solana.Signature{}, nil, nil + }, + } + + buffer := buffer.NewMemoryPartitionedBuffer[telemetry.PartitionKey, telemetry.Sample](1024) + buffer.Add(key, sample) + + submitter, err := telemetry.NewSubmitter(log, &telemetry.SubmitterConfig{ + Interval: time.Hour, + Buffer: buffer, + ProgramClient: telemetryProgram, + MaxAttempts: 2, + MaxConcurrency: 10, + BackoffFunc: func(_ int) time.Duration { return 0 }, + GetCurrentEpoch: func(ctx context.Context) (uint64, error) { + return 100, nil + }, + }) + require.NoError(t, err) + + submitter.Tick(context.Background()) + + assert.Equal(t, int32(0), atomic.LoadInt32(&initSamplesCalled), "should not initialize samples account") + assert.Equal(t, int32(1), atomic.LoadInt32(&initTimestampIndexCalled), "should initialize timestamp index") + assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should try write twice (before and after timestamp index init)") + }) + + t.Run("clears_timestamp_index_pk_when_init_fails", func(t *testing.T) { + t.Parallel() + + log := log.With("test", t.Name()) + + key := newTestPartitionKey() + sample := newTestSample() + + var writeCalled int32 + var retryTimestampIndexPK *solana.PublicKey + telemetryProgram := &mockTelemetryProgramClient{ + WriteDeviceLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + n := atomic.AddInt32(&writeCalled, 1) + if n == 1 { + return solana.Signature{}, nil, sdktelemetry.ErrTimestampIndexNotFound + } + retryTimestampIndexPK = config.TimestampIndexPK + return solana.Signature{}, nil, nil + }, + InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + return solana.Signature{}, nil, errors.New("init failed") + }, + } + + buffer := buffer.NewMemoryPartitionedBuffer[telemetry.PartitionKey, telemetry.Sample](1024) + buffer.Add(key, sample) + + submitter, err := telemetry.NewSubmitter(log, &telemetry.SubmitterConfig{ + Interval: time.Hour, + Buffer: buffer, + ProgramClient: telemetryProgram, + MaxAttempts: 2, + MaxConcurrency: 10, + BackoffFunc: func(_ int) time.Duration { return 0 }, + GetCurrentEpoch: func(ctx context.Context) (uint64, error) { + return 100, nil + }, + }) + require.NoError(t, err) + + submitter.Tick(context.Background()) + + assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should retry write after failed timestamp index init") + assert.Nil(t, retryTimestampIndexPK, "retry write should have nil TimestampIndexPK after failed init") + }) + + t.Run("clears_timestamp_index_pk_when_init_fails_on_new_account", func(t *testing.T) { + t.Parallel() + + log := log.With("test", t.Name()) + + key := newTestPartitionKey() + sample := newTestSample() + + var writeCalled int32 + var retryTimestampIndexPK *solana.PublicKey + telemetryProgram := &mockTelemetryProgramClient{ + WriteDeviceLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.WriteDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + n := atomic.AddInt32(&writeCalled, 1) + if n == 1 { + return solana.Signature{}, nil, sdktelemetry.ErrAccountNotFound + } + retryTimestampIndexPK = config.TimestampIndexPK + return solana.Signature{}, nil, nil + }, + InitializeDeviceLatencySamplesFunc: func(ctx context.Context, config sdktelemetry.InitializeDeviceLatencySamplesInstructionConfig) (solana.Signature, *solanarpc.GetTransactionResult, error) { + return solana.Signature{}, nil, nil + }, + InitializeTimestampIndexFunc: func(ctx context.Context, samplesAccountPK solana.PublicKey) (solana.Signature, *solanarpc.GetTransactionResult, error) { + return solana.Signature{}, nil, errors.New("init failed") + }, + } + + buffer := buffer.NewMemoryPartitionedBuffer[telemetry.PartitionKey, telemetry.Sample](1024) + buffer.Add(key, sample) + + submitter, err := telemetry.NewSubmitter(log, &telemetry.SubmitterConfig{ + Interval: time.Hour, + Buffer: buffer, + ProgramClient: telemetryProgram, + MaxAttempts: 2, + MaxConcurrency: 10, + BackoffFunc: func(_ int) time.Duration { return 0 }, + GetCurrentEpoch: func(ctx context.Context) (uint64, error) { + return 100, nil + }, + }) + require.NoError(t, err) + + submitter.Tick(context.Background()) + + assert.Equal(t, int32(2), atomic.LoadInt32(&writeCalled), "should retry write after failed timestamp index init") + assert.Nil(t, retryTimestampIndexPK, "retry write should have nil TimestampIndexPK after failed init") + }) + t.Run("failed_retries_reinsert_at_front_preserving_order", func(t *testing.T) { t.Parallel() diff --git a/rfcs/rfc20-telemetry-write-timestamp-index.md b/rfcs/rfc20-telemetry-write-timestamp-index.md new file mode 100644 index 0000000000..fc41fb2443 --- /dev/null +++ b/rfcs/rfc20-telemetry-write-timestamp-index.md @@ -0,0 +1,230 @@ +# Telemetry Write Timestamp Index + +## Summary + +**Status: Implemented** + +Add a companion timestamp index account to telemetry latency sample accounts so that the actual wall-clock time of each sample can be reliably determined, even when the writing agent experiences downtime mid-epoch. + +Today, sample timestamps are inferred as `start_timestamp + index * sampling_interval`. This breaks when the agent stops and resumes — samples after the gap are assigned incorrect timestamps with no way for consumers to detect or correct the error. + +## Motivation + +The telemetry program stores latency samples as a flat array of `u32` RTT values with a single `start_timestamp_microseconds` set on the first write. Consumers reconstruct per-sample timestamps by assuming uniform spacing at `sampling_interval_microseconds`. + +This assumption fails when the agent on a device restarts, crashes, or is intentionally stopped for maintenance. After a gap, the agent resumes appending samples, but the implicit timestamp calculation shifts all subsequent samples earlier than their true measurement time. There is no signal in the account data to indicate that a gap occurred or how long it lasted. + +This affects: + +- **Reward calculations**: Latency percentiles (p95/p99) computed per-epoch may include samples attributed to the wrong time windows. +- **Observability**: Dashboards and monitoring tools that plot latency over time will show misleading timelines. +- **Debugging**: Operators investigating network issues cannot correlate samples with real-world events when timestamps are wrong. + +## New Terminology + +- **Timestamp index**: A companion account containing a sequence of `(sample_index, timestamp_microseconds)` entries, one per write batch. Each entry records the sample index and wall-clock time at the start of a write batch, allowing consumers to reconstruct accurate per-sample timestamps. +- **Timestamp index entry**: A single `(u32, u64)` pair in the timestamp index — 12 bytes. + +## Alternatives Considered + +### Do nothing + +Consumers have no way to detect or correct gaps. The problem worsens as the network grows and agent restarts become more frequent. + +### Gap markers (sentinel values) + +When the agent resumes after downtime, it backfills `u32::MAX` sentinel values for each missed sampling interval before writing real samples. This preserves the implicit timestamp model — every slot has a value, so `start_timestamp + index * sampling_interval` remains correct. + +**Pros**: No schema change, no program change, deploy by updating agent behavior only. + +**Cons**: Wastes onchain storage proportional to gap duration. A 1-hour gap at 5-second intervals writes 720 useless values (2.8 KB). A 12-hour gap writes 8,640 values (33.6 KB), consuming nearly a quarter of the 35,000 sample capacity. The agent must also track its own last-write time to calculate missed slots, which introduces its own failure modes. + +### Per-sample timestamps + +Store `(timestamp_us: u64, rtt_us: u32)` per sample instead of just `u32`. + +**Pros**: Every sample is self-describing. + +**Cons**: 3x storage increase (12 bytes vs 4 bytes per sample). Significantly increases onchain rent costs and reduces the number of samples that fit in a single account. + +### Inline timestamp index + +Store the timestamp index inside the samples account itself, interleaved between the header and the samples region. + +**Pros**: Single account — no companion account to manage or fetch. + +**Cons**: Each write requires a `memmove` to shift existing samples forward by 12 bytes to make room for the new index entry. This scales linearly with sample count (up to ~140 KB at capacity). It also changes the samples account layout, requiring new account type discriminators and breaking the clean separation between sample data and metadata. + +## Detailed Design + +### Architecture + +The timestamp index is stored in a separate companion account, derived as a PDA from the samples account's public key. The samples account layout is completely unchanged. + +``` +Samples account (unchanged): [header] [samples...] +Timestamp index account (new): [header] [entries...] +``` + +On each write instruction, the program appends samples to the samples account and appends a timestamp index entry to the companion account, both in the same instruction. + +### Timestamp Index Account + +#### PDA Derivation + +The timestamp index account is derived from the samples account: + +**Device latency timestamp index:** +``` +Seeds: [b"telemetry", b"tsindex", samples_account_pk] +``` + +**Internet latency timestamp index:** +``` +Seeds: [b"telemetry", b"tsindex", samples_account_pk] +``` + +Both use the same seed prefix since the samples account public key is unique. + +#### Account Layout + +```rust +pub struct TimestampIndexHeader { + pub account_type: AccountType, // 1 byte (new discriminator) + pub samples_account_pk: Pubkey, // 32 bytes + pub next_entry_index: u32, // 4 bytes + pub _unused: [u8; 64], // 64 bytes (reserved) +} +// Total header: 101 bytes +``` + +Followed by a flat array of entries: + +```rust +pub struct TimestampIndexEntry { + pub sample_index: u32, // 4 bytes + pub timestamp_microseconds: u64, // 8 bytes +} +// Total per entry: 12 bytes +``` + +- `sample_index`: The value of `next_sample_index` on the samples account at the time of the write (i.e., the index of the first sample in this batch). +- `timestamp_microseconds`: The wall-clock time provided by the agent for this batch. + +Entries are append-only and naturally ordered by `sample_index`. + +#### Maximum Entries + +`MAX_TIMESTAMP_INDEX_ENTRIES = 10_000` entries is enforced. At one entry per write batch (up to 245 samples each), this comfortably covers a 48-hour epoch — e.g., 5-second sampling produces ~141 batches, and even 1-second sampling produces ~706 batches. Once full, the write instruction silently skips the timestamp index append rather than failing the transaction. Total max account size: `101 + 10,000 * 12 = ~120 KB`. + +### Account Type Enum + +```rust +pub enum AccountType { + DeviceLatencySamplesV0 = 1, + InternetLatencySamplesV0 = 2, + DeviceLatencySamples = 3, + InternetLatencySamples = 4, + TimestampIndex = 5, // new +} +``` + +Only one new discriminator is needed since the timestamp index account is structurally identical for both device and internet latency — the `samples_account_pk` field links it to the correct samples account. + +### Instruction Changes + +#### Initialize + +New instruction `InitializeTimestampIndex`: + +- **Accounts**: `[timestamp_index_account, samples_account, agent, system_program]` +- **Args**: None (all data derived from the samples account) +- **Behavior**: Creates the timestamp index PDA, sets `samples_account_pk` from the provided samples account, sets `next_entry_index = 0`. + +This can be called alongside the existing `InitializeDeviceLatencySamples` / `InitializeInternetLatencySamples` instructions, or as a separate transaction. + +#### Write + +The existing `WriteDeviceLatencySamples` and `WriteInternetLatencySamples` instructions are extended to accept an optional additional account: + +- **Accounts**: `[samples_account, agent, system_program, timestamp_index_account (optional)]` +- **Args**: Unchanged — `start_timestamp_microseconds` and `samples` are already provided. + +When the timestamp index account is present: + +1. Validate it is the correct PDA derived from the samples account. +2. Validate its `samples_account_pk` matches. +3. Append a new entry: `(sample_index: current next_sample_index, timestamp_microseconds: args.start_timestamp_microseconds)`. +4. Increment `next_entry_index`. +5. Resize the account if needed. + +When the timestamp index account is absent, the write proceeds as today — this ensures backward compatibility with agents that haven't been updated. + +### Timestamp Reconstruction + +Consumers reconstruct per-sample timestamps as follows: + +``` +For sample at index i: + 1. Find the timestamp index entry (si, ts) where si <= i + and the next entry's si > i (or it's the last entry). + 2. timestamp_of_sample_i = ts + (i - si) * sampling_interval_microseconds +``` + +Gaps are detected when the elapsed time between consecutive timestamp index entries exceeds the expected time based on sample count: + +``` +For consecutive entries (si_a, ts_a) and (si_b, ts_b): + expected_elapsed = (si_b - si_a) * sampling_interval + actual_elapsed = ts_b - ts_a + gap = actual_elapsed - expected_elapsed + if gap > threshold: + // gap detected between samples si_a and si_b +``` + +### Agent Changes + +The agent requires minimal changes: + +1. Call `InitializeTimestampIndex` after initializing each samples account at the start of an epoch. +2. Pass the timestamp index account as an additional account in write instructions. + +No behavioral changes are needed — the agent already provides `start_timestamp_microseconds` on every write call. Previously, the program only used this value to set the header field on the first write (when it was zero) and discarded it on subsequent writes. Now, it also records each batch's timestamp in the companion account. + +### SDK Changes + +Each SDK (Go, Python, TypeScript) needs: + +1. A new account type constant for `TimestampIndex`. +2. New deserialization functions/structs for the timestamp index account. +3. A helper function to reconstruct per-sample timestamps given a samples account and its companion timestamp index. +4. Existing deserialization for samples accounts remains completely unchanged. + +## Impact + +- **Onchain program**: One new account type, one new initialize instruction, extended write instructions to optionally accept the companion account. Moderate change. +- **Samples accounts**: No changes whatsoever — existing layout, PDA derivation, and deserialization are untouched. +- **SDKs**: New deserialization for the timestamp index account and timestamp reconstruction helpers. Binary test fixtures need regeneration for the new account type. +- **Storage overhead**: 12 bytes per write batch + 101 byte header per companion account. At typical write intervals (every 30-60 seconds), a 48-hour epoch produces ~3,000-6,000 entries = 36-72 KB per companion account. +- **Compute overhead**: Append-only writes to the companion account — no memmove or shifting. +- **Agent**: Pass one additional account on initialize and write calls. +- **Transaction size**: One additional account (32 bytes) in each write transaction. + +## Security Considerations + +No new attack surfaces are introduced. The timestamp values are self-reported by the agent, same as today. The program does not validate timestamp ordering or reasonableness — this is consistent with the existing trust model where agents are trusted contributors (per rfc4). + +The `MAX_TIMESTAMP_INDEX_ENTRIES` limit prevents a misbehaving agent from growing the timestamp index unboundedly. + +The companion account is validated via PDA derivation and the `samples_account_pk` field, preventing an agent from associating a timestamp index with the wrong samples account. + +## Backward Compatibility + +- **Existing samples accounts**: Completely unaffected. No layout changes, no new account type discriminators. +- **Existing write instructions**: Continue to work without the timestamp index account. Agents that haven't been updated simply don't create or pass the companion account. +- **Rollout**: The program upgrade adds the new instruction and extends existing write instructions. Agents can be updated independently — until updated, they continue writing without timestamp indices. No migration of existing accounts is needed. +- **SDKs**: Existing deserialization paths are unchanged. The timestamp index is an additive feature — consumers that don't need it can ignore the companion accounts entirely. + +## Open Questions + +None at this time. diff --git a/sdk/telemetry/go/fixture_test.go b/sdk/telemetry/go/fixture_test.go index 37ac9e8a83..7a1eed2ee2 100644 --- a/sdk/telemetry/go/fixture_test.go +++ b/sdk/telemetry/go/fixture_test.go @@ -121,6 +121,131 @@ func assertFields(t *testing.T, expected []fieldValue, got map[string]any) { } } +func TestFixtureTimestampIndex(t *testing.T) { + data, meta := loadFixture(t, "timestamp_index") + d, err := DeserializeTimestampIndex(data) + if err != nil { + t.Fatalf("DeserializeTimestampIndex: %v", err) + } + + got := map[string]any{ + "AccountType": uint8(d.AccountType), + "SamplesAccountPK": solana.PublicKey(d.SamplesAccountPK), + "NextEntryIndex": d.NextEntryIndex, + "EntriesCount": uint32(len(d.Entries)), + } + if len(d.Entries) > 0 { + got["Entry0SampleIndex"] = d.Entries[0].SampleIndex + got["Entry0Timestamp"] = d.Entries[0].TimestampMicroseconds + } + if len(d.Entries) > 1 { + got["Entry1SampleIndex"] = d.Entries[1].SampleIndex + got["Entry1Timestamp"] = d.Entries[1].TimestampMicroseconds + } + if len(d.Entries) > 2 { + got["Entry2SampleIndex"] = d.Entries[2].SampleIndex + got["Entry2Timestamp"] = d.Entries[2].TimestampMicroseconds + } + + assertFields(t, meta.Fields, got) +} + +func TestReconstructTimestamp(t *testing.T) { + interval := uint64(5_000_000) // 5s in µs + entries := []TimestampIndexEntry{ + {SampleIndex: 0, TimestampMicroseconds: 1_700_000_000_000_000}, + {SampleIndex: 12, TimestampMicroseconds: 1_700_000_000_120_000}, + {SampleIndex: 24, TimestampMicroseconds: 1_700_000_000_240_000}, + } + + // Sample 0: first entry, offset 0 + ts := ReconstructTimestamp(entries, 0, 0, interval) + assertEq(t, "sample0", uint64(1_700_000_000_000_000), ts) + + // Sample 5: first entry, offset 5 + ts = ReconstructTimestamp(entries, 5, 0, interval) + assertEq(t, "sample5", uint64(1_700_000_000_000_000+5*5_000_000), ts) + + // Sample 12: second entry, offset 0 + ts = ReconstructTimestamp(entries, 12, 0, interval) + assertEq(t, "sample12", uint64(1_700_000_000_120_000), ts) + + // Sample 15: second entry, offset 3 + ts = ReconstructTimestamp(entries, 15, 0, interval) + assertEq(t, "sample15", uint64(1_700_000_000_120_000+3*5_000_000), ts) + + // Sample 30: third entry, offset 6 + ts = ReconstructTimestamp(entries, 30, 0, interval) + assertEq(t, "sample30", uint64(1_700_000_000_240_000+6*5_000_000), ts) +} + +func TestReconstructTimestampFallback(t *testing.T) { + // No entries — falls back to implicit model. + ts := ReconstructTimestamp(nil, 10, 1_700_000_000_000_000, 5_000_000) + assertEq(t, "fallback", uint64(1_700_000_000_000_000+10*5_000_000), ts) +} + +func TestReconstructTimestamps(t *testing.T) { + entries := []TimestampIndexEntry{ + {SampleIndex: 0, TimestampMicroseconds: 1000}, + {SampleIndex: 3, TimestampMicroseconds: 5000}, + } + ts := ReconstructTimestamps(5, entries, 0, 100) + expected := []uint64{1000, 1100, 1200, 5000, 5100} + for i, want := range expected { + assertEq(t, "ts_"+strconv.Itoa(i), want, ts[i]) + } +} + +func TestReconstructTimestamp_LateStart(t *testing.T) { + // Timestamp index created mid-epoch: first entry starts at sample 120. + // Samples 0..119 should fall back to the implicit model. + startTS := uint64(1_700_000_000_000_000) + interval := uint64(5_000_000) // 5s in µs + entries := []TimestampIndexEntry{ + {SampleIndex: 120, TimestampMicroseconds: 1_700_000_000_800_000}, + {SampleIndex: 240, TimestampMicroseconds: 1_700_000_001_600_000}, + } + + // Sample 0: before first entry, should use implicit model. + ts := ReconstructTimestamp(entries, 0, startTS, interval) + assertEq(t, "sample0", startTS, ts) + + // Sample 50: still before first entry. + ts = ReconstructTimestamp(entries, 50, startTS, interval) + assertEq(t, "sample50", startTS+50*interval, ts) + + // Sample 119: last sample before first entry. + ts = ReconstructTimestamp(entries, 119, startTS, interval) + assertEq(t, "sample119", startTS+119*interval, ts) + + // Sample 120: exactly at first entry. + ts = ReconstructTimestamp(entries, 120, startTS, interval) + assertEq(t, "sample120", uint64(1_700_000_000_800_000), ts) + + // Sample 125: within first entry's range. + ts = ReconstructTimestamp(entries, 125, startTS, interval) + assertEq(t, "sample125", uint64(1_700_000_000_800_000+5*interval), ts) + + // Sample 240: at second entry. + ts = ReconstructTimestamp(entries, 240, startTS, interval) + assertEq(t, "sample240", uint64(1_700_000_001_600_000), ts) +} + +func TestReconstructTimestamps_LateStart(t *testing.T) { + // First entry starts at sample 3, so samples 0..2 use implicit model. + startTS := uint64(1000) + interval := uint64(100) + entries := []TimestampIndexEntry{ + {SampleIndex: 3, TimestampMicroseconds: 5000}, + } + ts := ReconstructTimestamps(5, entries, startTS, interval) + expected := []uint64{1000, 1100, 1200, 5000, 5100} + for i, want := range expected { + assertEq(t, "ts_"+strconv.Itoa(i), want, ts[i]) + } +} + func assertEq(t *testing.T, name string, want, got any) { t.Helper() if !reflect.DeepEqual(want, got) { diff --git a/sdk/telemetry/go/state.go b/sdk/telemetry/go/state.go index 5b633067d2..2670d3dcab 100644 --- a/sdk/telemetry/go/state.go +++ b/sdk/telemetry/go/state.go @@ -13,17 +13,22 @@ const ( AccountTypeInternetLatencySamplesV0 AccountType = 2 AccountTypeDeviceLatencySamples AccountType = 3 AccountTypeInternetLatencySamples AccountType = 4 + AccountTypeTimestampIndex AccountType = 5 ) const ( TelemetrySeedPrefix = "telemetry" DeviceLatencySamplesSeed = "dzlatency" InternetLatencySamplesSeed = "inetlatency" + TimestampIndexSeed = "tsindex" MaxDeviceLatencySamplesPerAccount = 35_000 MaxInternetLatencySamplesPerAccount = 3_000 + MaxTimestampIndexEntries = 10_000 - deviceLatencyHeaderSize = 1 + 8 + 32*6 + 8 + 8 + 4 + 128 + deviceLatencyHeaderSize = 1 + 8 + 32*6 + 8 + 8 + 4 + 128 + timestampIndexHeaderSize = 1 + 32 + 4 + 64 + timestampIndexEntrySize = 4 + 8 ) type DeviceLatencySamples struct { @@ -135,3 +140,114 @@ func DeserializeInternetLatencySamples(data []byte) (*InternetLatencySamples, er return d, nil } + +type TimestampIndexEntry struct { + SampleIndex uint32 + TimestampMicroseconds uint64 +} + +type TimestampIndex struct { + AccountType AccountType + SamplesAccountPK [32]byte + NextEntryIndex uint32 + Entries []TimestampIndexEntry +} + +func DeserializeTimestampIndex(data []byte) (*TimestampIndex, error) { + if len(data) < timestampIndexHeaderSize { + return nil, fmt.Errorf("data too short for timestamp index header: %d < %d", len(data), timestampIndexHeaderSize) + } + + r := borsh.NewReader(data) + d := &TimestampIndex{} + + v, _ := r.ReadU8() + d.AccountType = AccountType(v) + d.SamplesAccountPK, _ = r.ReadPubkey() + d.NextEntryIndex, _ = r.ReadU32() + + _, _ = r.ReadBytes(64) // _unused + + count := int(d.NextEntryIndex) + if count > MaxTimestampIndexEntries { + return nil, fmt.Errorf("next_entry_index %d exceeds max %d", count, MaxTimestampIndexEntries) + } + + if r.Remaining() < count*timestampIndexEntrySize { + return nil, fmt.Errorf("data too short for %d timestamp index entries: %d < %d", count, r.Remaining(), count*timestampIndexEntrySize) + } + + d.Entries = make([]TimestampIndexEntry, count) + for i := range count { + d.Entries[i].SampleIndex, _ = r.ReadU32() + d.Entries[i].TimestampMicroseconds, _ = r.ReadU64() + } + + return d, nil +} + +// ReconstructTimestamp returns the wall-clock timestamp (in microseconds) for +// the sample at the given index, using the timestamp index entries and the +// sampling interval from the samples account header. +// +// Uses binary search over entries. O(log m) where m is the number of entries. +// If the timestamp index has no entries, falls back to the implicit model. +func ReconstructTimestamp( + entries []TimestampIndexEntry, + sampleIndex uint32, + startTimestampMicroseconds uint64, + samplingIntervalMicroseconds uint64, +) uint64 { + if len(entries) == 0 { + return startTimestampMicroseconds + uint64(sampleIndex)*samplingIntervalMicroseconds + } + + // Binary search: find the last entry where SampleIndex <= sampleIndex. + lo, hi := 0, len(entries)-1 + for lo < hi { + mid := lo + (hi-lo+1)/2 + if entries[mid].SampleIndex <= sampleIndex { + lo = mid + } else { + hi = mid - 1 + } + } + + entry := entries[lo] + if entry.SampleIndex > sampleIndex { + return startTimestampMicroseconds + uint64(sampleIndex)*samplingIntervalMicroseconds + } + return entry.TimestampMicroseconds + uint64(sampleIndex-entry.SampleIndex)*samplingIntervalMicroseconds +} + +// ReconstructTimestamps returns wall-clock timestamps (in microseconds) for all +// samples, using the timestamp index to correct for gaps. +// +// Single-pass O(n + m) where n is sampleCount and m is the number of entries. +func ReconstructTimestamps( + sampleCount uint32, + entries []TimestampIndexEntry, + startTimestampMicroseconds uint64, + samplingIntervalMicroseconds uint64, +) []uint64 { + timestamps := make([]uint64, sampleCount) + if sampleCount == 0 { + return timestamps + } + + entryIdx := 0 + for i := range sampleCount { + // Advance to the last entry that covers this sample index. + for entryIdx+1 < len(entries) && entries[entryIdx+1].SampleIndex <= i { + entryIdx++ + } + + if len(entries) == 0 || entries[entryIdx].SampleIndex > i { + timestamps[i] = startTimestampMicroseconds + uint64(i)*samplingIntervalMicroseconds + } else { + e := entries[entryIdx] + timestamps[i] = e.TimestampMicroseconds + uint64(i-e.SampleIndex)*samplingIntervalMicroseconds + } + } + return timestamps +} diff --git a/sdk/telemetry/python/telemetry/state.py b/sdk/telemetry/python/telemetry/state.py index f1c4b77f3e..7b14baf28b 100644 --- a/sdk/telemetry/python/telemetry/state.py +++ b/sdk/telemetry/python/telemetry/state.py @@ -19,8 +19,11 @@ MAX_DEVICE_LATENCY_SAMPLES_PER_ACCOUNT = 35_000 MAX_INTERNET_LATENCY_SAMPLES_PER_ACCOUNT = 3_000 +MAX_TIMESTAMP_INDEX_ENTRIES = 10_000 DEVICE_LATENCY_HEADER_SIZE = 1 + 8 + 32 * 6 + 8 + 8 + 4 + 128 +TIMESTAMP_INDEX_HEADER_SIZE = 1 + 32 + 4 + 64 +TIMESTAMP_INDEX_ENTRY_SIZE = 4 + 8 def _read_pubkey(r: DefensiveReader) -> Pubkey: @@ -139,3 +142,115 @@ def from_bytes(cls, data: bytes) -> InternetLatencySamples: next_sample_index=next_sample_index, samples=samples, ) + + +@dataclass +class TimestampIndexEntry: + sample_index: int + timestamp_microseconds: int + + +@dataclass +class TimestampIndex: + account_type: int + samples_account_pk: Pubkey + next_entry_index: int + entries: list[TimestampIndexEntry] = field(default_factory=list) + + @classmethod + def from_bytes(cls, data: bytes) -> TimestampIndex: + if len(data) < TIMESTAMP_INDEX_HEADER_SIZE: + raise ValueError( + f"data too short for timestamp index header: {len(data)} < {TIMESTAMP_INDEX_HEADER_SIZE}" + ) + + r = DefensiveReader(data) + + account_type = r.read_u8() + samples_account_pk = _read_pubkey(r) + next_entry_index = r.read_u32() + + r.read_bytes(64) # reserved + + if next_entry_index > MAX_TIMESTAMP_INDEX_ENTRIES: + raise ValueError( + f"next_entry_index {next_entry_index} exceeds max {MAX_TIMESTAMP_INDEX_ENTRIES}" + ) + + count = next_entry_index + if r.remaining < count * TIMESTAMP_INDEX_ENTRY_SIZE: + raise ValueError( + f"data too short for {count} timestamp index entries: " + f"{r.remaining} < {count * TIMESTAMP_INDEX_ENTRY_SIZE}" + ) + + entries: list[TimestampIndexEntry] = [] + for _ in range(count): + sample_index = r.read_u32() + timestamp_microseconds = r.read_u64() + entries.append(TimestampIndexEntry(sample_index, timestamp_microseconds)) + + return cls( + account_type=account_type, + samples_account_pk=samples_account_pk, + next_entry_index=next_entry_index, + entries=entries, + ) + + +def reconstruct_timestamp( + entries: list[TimestampIndexEntry], + sample_index: int, + start_timestamp_microseconds: int, + sampling_interval_microseconds: int, +) -> int: + """Return the wall-clock timestamp (microseconds) for a sample at the given index. + + Uses binary search over entries. O(log m) where m is the number of entries. + Falls back to the implicit model when no entries are available. + """ + if not entries: + return start_timestamp_microseconds + sample_index * sampling_interval_microseconds + + # Binary search: find the last entry where sample_index <= target. + lo, hi = 0, len(entries) - 1 + while lo < hi: + mid = lo + (hi - lo + 1) // 2 + if entries[mid].sample_index <= sample_index: + lo = mid + else: + hi = mid - 1 + + entry = entries[lo] + if entry.sample_index > sample_index: + return start_timestamp_microseconds + sample_index * sampling_interval_microseconds + return entry.timestamp_microseconds + (sample_index - entry.sample_index) * sampling_interval_microseconds + + +def reconstruct_timestamps( + sample_count: int, + entries: list[TimestampIndexEntry], + start_timestamp_microseconds: int, + sampling_interval_microseconds: int, +) -> list[int]: + """Return wall-clock timestamps (microseconds) for all samples. + + Single-pass O(n + m) where n is sample_count and m is the number of entries. + """ + if not entries: + return [ + start_timestamp_microseconds + i * sampling_interval_microseconds + for i in range(sample_count) + ] + + timestamps: list[int] = [] + entry_idx = 0 + for i in range(sample_count): + while entry_idx + 1 < len(entries) and entries[entry_idx + 1].sample_index <= i: + entry_idx += 1 + e = entries[entry_idx] + if e.sample_index > i: + timestamps.append(start_timestamp_microseconds + i * sampling_interval_microseconds) + else: + timestamps.append(e.timestamp_microseconds + (i - e.sample_index) * sampling_interval_microseconds) + return timestamps diff --git a/sdk/telemetry/python/telemetry/tests/test_fixtures.py b/sdk/telemetry/python/telemetry/tests/test_fixtures.py index 308b8a6337..16ade7113c 100644 --- a/sdk/telemetry/python/telemetry/tests/test_fixtures.py +++ b/sdk/telemetry/python/telemetry/tests/test_fixtures.py @@ -5,7 +5,14 @@ from solders.pubkey import Pubkey # type: ignore[import-untyped] -from telemetry.state import DeviceLatencySamples, InternetLatencySamples +from telemetry.state import ( + DeviceLatencySamples, + InternetLatencySamples, + TimestampIndex, + TimestampIndexEntry, + reconstruct_timestamp, + reconstruct_timestamps, +) FIXTURES_DIR = Path(__file__).resolve().parent.parent.parent.parent / "testdata" / "fixtures" @@ -75,3 +82,75 @@ def test_deserialize(self): "SamplesCount": len(d.samples), }, ) + + +class TestFixtureTimestampIndex: + def test_deserialize(self): + data, meta = _load_fixture("timestamp_index") + d = TimestampIndex.from_bytes(data) + got = { + "AccountType": d.account_type, + "SamplesAccountPK": d.samples_account_pk, + "NextEntryIndex": d.next_entry_index, + "EntriesCount": len(d.entries), + } + if len(d.entries) > 0: + got["Entry0SampleIndex"] = d.entries[0].sample_index + got["Entry0Timestamp"] = d.entries[0].timestamp_microseconds + if len(d.entries) > 1: + got["Entry1SampleIndex"] = d.entries[1].sample_index + got["Entry1Timestamp"] = d.entries[1].timestamp_microseconds + if len(d.entries) > 2: + got["Entry2SampleIndex"] = d.entries[2].sample_index + got["Entry2Timestamp"] = d.entries[2].timestamp_microseconds + _assert_fields(meta["fields"], got) + + +class TestReconstructTimestamp: + def test_with_entries(self): + interval = 5_000_000 + entries = [ + TimestampIndexEntry(0, 1_700_000_000_000_000), + TimestampIndexEntry(12, 1_700_000_000_120_000), + TimestampIndexEntry(24, 1_700_000_000_240_000), + ] + assert reconstruct_timestamp(entries, 0, 0, interval) == 1_700_000_000_000_000 + assert reconstruct_timestamp(entries, 5, 0, interval) == 1_700_000_000_000_000 + 5 * interval + assert reconstruct_timestamp(entries, 12, 0, interval) == 1_700_000_000_120_000 + assert reconstruct_timestamp(entries, 15, 0, interval) == 1_700_000_000_120_000 + 3 * interval + assert reconstruct_timestamp(entries, 30, 0, interval) == 1_700_000_000_240_000 + 6 * interval + + def test_fallback_no_entries(self): + ts = reconstruct_timestamp([], 10, 1_700_000_000_000_000, 5_000_000) + assert ts == 1_700_000_000_000_000 + 10 * 5_000_000 + + def test_late_start(self): + start_ts = 1_700_000_000_000_000 + interval = 5_000_000 + entries = [ + TimestampIndexEntry(120, 1_700_000_000_800_000), + TimestampIndexEntry(240, 1_700_000_001_600_000), + ] + # Before first entry: implicit model + assert reconstruct_timestamp(entries, 0, start_ts, interval) == start_ts + assert reconstruct_timestamp(entries, 50, start_ts, interval) == start_ts + 50 * interval + assert reconstruct_timestamp(entries, 119, start_ts, interval) == start_ts + 119 * interval + # At and after first entry + assert reconstruct_timestamp(entries, 120, start_ts, interval) == 1_700_000_000_800_000 + assert reconstruct_timestamp(entries, 125, start_ts, interval) == 1_700_000_000_800_000 + 5 * interval + assert reconstruct_timestamp(entries, 240, start_ts, interval) == 1_700_000_001_600_000 + + def test_reconstruct_all(self): + entries = [ + TimestampIndexEntry(0, 1000), + TimestampIndexEntry(3, 5000), + ] + ts = reconstruct_timestamps(5, entries, 0, 100) + assert ts == [1000, 1100, 1200, 5000, 5100] + + def test_reconstruct_all_late_start(self): + entries = [ + TimestampIndexEntry(3, 5000), + ] + ts = reconstruct_timestamps(5, entries, 1000, 100) + assert ts == [1000, 1100, 1200, 5000, 5100] diff --git a/sdk/telemetry/testdata/fixtures/generate-fixtures/src/main.rs b/sdk/telemetry/testdata/fixtures/generate-fixtures/src/main.rs index 2f4a29bb6b..acdde4e71f 100644 --- a/sdk/telemetry/testdata/fixtures/generate-fixtures/src/main.rs +++ b/sdk/telemetry/testdata/fixtures/generate-fixtures/src/main.rs @@ -13,6 +13,7 @@ use doublezero_telemetry::state::{ accounttype::AccountType, device_latency_samples::{DeviceLatencySamples, DeviceLatencySamplesHeader}, internet_latency_samples::{InternetLatencySamples, InternetLatencySamplesHeader}, + timestamp_index::{TimestampIndex, TimestampIndexEntry, TimestampIndexHeader}, }; use serde::Serialize; use solana_program::pubkey::Pubkey; @@ -55,6 +56,7 @@ fn main() { generate_device_latency_samples(&fixtures_dir); generate_internet_latency_samples(&fixtures_dir); + generate_timestamp_index(&fixtures_dir); println!("\nall fixtures generated in {}", fixtures_dir.display()); } @@ -153,3 +155,53 @@ fn generate_internet_latency_samples(dir: &Path) { write_fixture(dir, "internet_latency_samples", &data, &meta); } + +fn generate_timestamp_index(dir: &Path) { + let samples_pk = pubkey_from_byte(0x21); + + let entries = vec![ + TimestampIndexEntry { + sample_index: 0, + timestamp_microseconds: 1_700_000_000_000_000, + }, + TimestampIndexEntry { + sample_index: 12, + timestamp_microseconds: 1_700_000_000_060_000, + }, + TimestampIndexEntry { + sample_index: 24, + timestamp_microseconds: 1_700_000_000_120_000, + }, + ]; + + let val = TimestampIndex { + header: TimestampIndexHeader { + account_type: AccountType::TimestampIndex, + samples_account_pk: samples_pk, + next_entry_index: entries.len() as u32, + _unused: [0; 64], + }, + entries: entries.clone(), + }; + + let data = borsh::to_vec(&val).unwrap(); + + let meta = FixtureMeta { + name: "timestamp_index".to_string(), + account_type: AccountType::TimestampIndex as u8, + fields: vec![ + FieldValue { name: "AccountType".into(), value: "5".into(), typ: "u8".into() }, + FieldValue { name: "SamplesAccountPK".into(), value: pubkey_bs58(&samples_pk), typ: "pubkey".into() }, + FieldValue { name: "NextEntryIndex".into(), value: "3".into(), typ: "u32".into() }, + FieldValue { name: "EntriesCount".into(), value: "3".into(), typ: "u32".into() }, + FieldValue { name: "Entry0SampleIndex".into(), value: "0".into(), typ: "u32".into() }, + FieldValue { name: "Entry0Timestamp".into(), value: "1700000000000000".into(), typ: "u64".into() }, + FieldValue { name: "Entry1SampleIndex".into(), value: "12".into(), typ: "u32".into() }, + FieldValue { name: "Entry1Timestamp".into(), value: "1700000000060000".into(), typ: "u64".into() }, + FieldValue { name: "Entry2SampleIndex".into(), value: "24".into(), typ: "u32".into() }, + FieldValue { name: "Entry2Timestamp".into(), value: "1700000000120000".into(), typ: "u64".into() }, + ], + }; + + write_fixture(dir, "timestamp_index", &data, &meta); +} diff --git a/sdk/telemetry/testdata/fixtures/timestamp_index.bin b/sdk/telemetry/testdata/fixtures/timestamp_index.bin new file mode 100644 index 0000000000..f99c88a03a Binary files /dev/null and b/sdk/telemetry/testdata/fixtures/timestamp_index.bin differ diff --git a/sdk/telemetry/testdata/fixtures/timestamp_index.json b/sdk/telemetry/testdata/fixtures/timestamp_index.json new file mode 100644 index 0000000000..8e13d641e5 --- /dev/null +++ b/sdk/telemetry/testdata/fixtures/timestamp_index.json @@ -0,0 +1,56 @@ +{ + "name": "timestamp_index", + "account_type": 5, + "fields": [ + { + "name": "AccountType", + "value": "5", + "typ": "u8" + }, + { + "name": "SamplesAccountPK", + "value": "3DpTLLwnVbLZUpy9nenjYabYYKeSJMMvGdBJcSEqnYpP", + "typ": "pubkey" + }, + { + "name": "NextEntryIndex", + "value": "3", + "typ": "u32" + }, + { + "name": "EntriesCount", + "value": "3", + "typ": "u32" + }, + { + "name": "Entry0SampleIndex", + "value": "0", + "typ": "u32" + }, + { + "name": "Entry0Timestamp", + "value": "1700000000000000", + "typ": "u64" + }, + { + "name": "Entry1SampleIndex", + "value": "12", + "typ": "u32" + }, + { + "name": "Entry1Timestamp", + "value": "1700000000060000", + "typ": "u64" + }, + { + "name": "Entry2SampleIndex", + "value": "24", + "typ": "u32" + }, + { + "name": "Entry2Timestamp", + "value": "1700000000120000", + "typ": "u64" + } + ] +} \ No newline at end of file diff --git a/sdk/telemetry/typescript/telemetry/index.ts b/sdk/telemetry/typescript/telemetry/index.ts index 4427eed3b6..1c9c9ee259 100644 --- a/sdk/telemetry/typescript/telemetry/index.ts +++ b/sdk/telemetry/typescript/telemetry/index.ts @@ -2,8 +2,13 @@ export { PROGRAM_IDS, LEDGER_RPC_URLS } from "./config.js"; export { type DeviceLatencySamples, type InternetLatencySamples, + type TimestampIndex, + type TimestampIndexEntry, deserializeDeviceLatencySamples, deserializeInternetLatencySamples, + deserializeTimestampIndex, + reconstructTimestamp, + reconstructTimestamps, } from "./state.js"; export { deriveDeviceLatencySamplesPda, diff --git a/sdk/telemetry/typescript/telemetry/state.ts b/sdk/telemetry/typescript/telemetry/state.ts index 39c13dad3e..bb6b678ce1 100644 --- a/sdk/telemetry/typescript/telemetry/state.ts +++ b/sdk/telemetry/typescript/telemetry/state.ts @@ -6,6 +6,9 @@ import { DefensiveReader } from "borsh-incremental"; const DEVICE_LATENCY_HEADER_SIZE = 1 + 8 + 32 * 6 + 8 + 8 + 4 + 128; const MAX_DEVICE_LATENCY_SAMPLES = 35_000; const MAX_INTERNET_LATENCY_SAMPLES = 3_000; +const TIMESTAMP_INDEX_HEADER_SIZE = 1 + 32 + 4 + 64; +const TIMESTAMP_INDEX_ENTRY_SIZE = 4 + 8; +const MAX_TIMESTAMP_INDEX_ENTRIES = 10_000; export interface DeviceLatencySamples { accountType: number; @@ -128,3 +131,128 @@ export function deserializeInternetLatencySamples( samples, }; } + +export interface TimestampIndexEntry { + sampleIndex: number; + timestampMicroseconds: bigint; +} + +export interface TimestampIndex { + accountType: number; + samplesAccountPK: PublicKey; + nextEntryIndex: number; + entries: TimestampIndexEntry[]; +} + +export function deserializeTimestampIndex( + data: Uint8Array, +): TimestampIndex { + if (data.length < TIMESTAMP_INDEX_HEADER_SIZE) { + throw new Error( + `data too short for timestamp index header: ${data.length} < ${TIMESTAMP_INDEX_HEADER_SIZE}`, + ); + } + + const r = new DefensiveReader(data); + + const accountType = r.readU8(); + const samplesAccountPK = readPubkey(r); + const nextEntryIndex = r.readU32(); + + r.readBytes(64); // _unused + + if (nextEntryIndex > MAX_TIMESTAMP_INDEX_ENTRIES) { + throw new Error( + `next_entry_index ${nextEntryIndex} exceeds max ${MAX_TIMESTAMP_INDEX_ENTRIES}`, + ); + } + + const count = nextEntryIndex; + if (r.remaining < count * TIMESTAMP_INDEX_ENTRY_SIZE) { + throw new Error( + `data too short for ${count} timestamp index entries: ${r.remaining} < ${count * TIMESTAMP_INDEX_ENTRY_SIZE}`, + ); + } + + const entries: TimestampIndexEntry[] = []; + for (let i = 0; i < count; i++) { + entries.push({ + sampleIndex: r.readU32(), + timestampMicroseconds: r.readU64(), + }); + } + + return { + accountType, + samplesAccountPK, + nextEntryIndex, + entries, + }; +} + +/** + * Returns the wall-clock timestamp (microseconds) for a sample at the given + * index. Uses binary search over entries — O(log m). Falls back to the + * implicit model when no entries are available. + */ +export function reconstructTimestamp( + entries: TimestampIndexEntry[], + sampleIndex: number, + startTimestampMicroseconds: bigint, + samplingIntervalMicroseconds: bigint, +): bigint { + if (entries.length === 0) { + return startTimestampMicroseconds + BigInt(sampleIndex) * samplingIntervalMicroseconds; + } + + // Binary search: find the last entry where sampleIndex <= target. + let lo = 0; + let hi = entries.length - 1; + while (lo < hi) { + const mid = lo + Math.ceil((hi - lo) / 2); + if (entries[mid].sampleIndex <= sampleIndex) { + lo = mid; + } else { + hi = mid - 1; + } + } + + const entry = entries[lo]; + if (entry.sampleIndex > sampleIndex) { + return startTimestampMicroseconds + BigInt(sampleIndex) * samplingIntervalMicroseconds; + } + return entry.timestampMicroseconds + BigInt(sampleIndex - entry.sampleIndex) * samplingIntervalMicroseconds; +} + +/** + * Returns wall-clock timestamps (microseconds) for all samples. + * Single-pass O(n + m) where n is sampleCount and m is the number of entries. + */ +export function reconstructTimestamps( + sampleCount: number, + entries: TimestampIndexEntry[], + startTimestampMicroseconds: bigint, + samplingIntervalMicroseconds: bigint, +): bigint[] { + const timestamps: bigint[] = []; + if (entries.length === 0) { + for (let i = 0; i < sampleCount; i++) { + timestamps.push(startTimestampMicroseconds + BigInt(i) * samplingIntervalMicroseconds); + } + return timestamps; + } + + let entryIdx = 0; + for (let i = 0; i < sampleCount; i++) { + while (entryIdx + 1 < entries.length && entries[entryIdx + 1].sampleIndex <= i) { + entryIdx++; + } + const e = entries[entryIdx]; + if (e.sampleIndex > i) { + timestamps.push(startTimestampMicroseconds + BigInt(i) * samplingIntervalMicroseconds); + } else { + timestamps.push(e.timestampMicroseconds + BigInt(i - e.sampleIndex) * samplingIntervalMicroseconds); + } + } + return timestamps; +} diff --git a/sdk/telemetry/typescript/telemetry/tests/fixtures.test.ts b/sdk/telemetry/typescript/telemetry/tests/fixtures.test.ts index e166e6c076..f994d5a302 100644 --- a/sdk/telemetry/typescript/telemetry/tests/fixtures.test.ts +++ b/sdk/telemetry/typescript/telemetry/tests/fixtures.test.ts @@ -9,6 +9,9 @@ import { PublicKey } from "@solana/web3.js"; import { deserializeDeviceLatencySamples, deserializeInternetLatencySamples, + deserializeTimestampIndex, + reconstructTimestamp, + reconstructTimestamps, } from "../state.js"; const FIXTURES_DIR = join( @@ -100,3 +103,84 @@ describe("InternetLatencySamples fixture", () => { }); }); }); + +describe("TimestampIndex fixture", () => { + test("deserialize", () => { + const [data, meta] = loadFixture("timestamp_index"); + const d = deserializeTimestampIndex(data); + const got: Record = { + AccountType: d.accountType, + SamplesAccountPK: d.samplesAccountPK, + NextEntryIndex: d.nextEntryIndex, + EntriesCount: d.entries.length, + }; + if (d.entries.length > 0) { + got.Entry0SampleIndex = d.entries[0].sampleIndex; + got.Entry0Timestamp = d.entries[0].timestampMicroseconds; + } + if (d.entries.length > 1) { + got.Entry1SampleIndex = d.entries[1].sampleIndex; + got.Entry1Timestamp = d.entries[1].timestampMicroseconds; + } + if (d.entries.length > 2) { + got.Entry2SampleIndex = d.entries[2].sampleIndex; + got.Entry2Timestamp = d.entries[2].timestampMicroseconds; + } + assertFields(meta.fields, got); + }); +}); + +describe("reconstructTimestamp", () => { + const interval = 5_000_000n; + const entries = [ + { sampleIndex: 0, timestampMicroseconds: 1_700_000_000_000_000n }, + { sampleIndex: 12, timestampMicroseconds: 1_700_000_000_120_000n }, + { sampleIndex: 24, timestampMicroseconds: 1_700_000_000_240_000n }, + ]; + + test("uses correct entry for each sample", () => { + expect(reconstructTimestamp(entries, 0, 0n, interval)).toBe(1_700_000_000_000_000n); + expect(reconstructTimestamp(entries, 5, 0n, interval)).toBe(1_700_000_000_000_000n + 5n * interval); + expect(reconstructTimestamp(entries, 12, 0n, interval)).toBe(1_700_000_000_120_000n); + expect(reconstructTimestamp(entries, 15, 0n, interval)).toBe(1_700_000_000_120_000n + 3n * interval); + expect(reconstructTimestamp(entries, 30, 0n, interval)).toBe(1_700_000_000_240_000n + 6n * interval); + }); + + test("falls back to implicit model with no entries", () => { + const ts = reconstructTimestamp([], 10, 1_700_000_000_000_000n, 5_000_000n); + expect(ts).toBe(1_700_000_000_000_000n + 10n * 5_000_000n); + }); + + test("late start falls back to implicit model for early samples", () => { + const startTS = 1_700_000_000_000_000n; + const lateEntries = [ + { sampleIndex: 120, timestampMicroseconds: 1_700_000_000_800_000n }, + { sampleIndex: 240, timestampMicroseconds: 1_700_000_001_600_000n }, + ]; + // Before first entry: implicit model + expect(reconstructTimestamp(lateEntries, 0, startTS, interval)).toBe(startTS); + expect(reconstructTimestamp(lateEntries, 50, startTS, interval)).toBe(startTS + 50n * interval); + expect(reconstructTimestamp(lateEntries, 119, startTS, interval)).toBe(startTS + 119n * interval); + // At and after first entry + expect(reconstructTimestamp(lateEntries, 120, startTS, interval)).toBe(1_700_000_000_800_000n); + expect(reconstructTimestamp(lateEntries, 125, startTS, interval)).toBe(1_700_000_000_800_000n + 5n * interval); + expect(reconstructTimestamp(lateEntries, 240, startTS, interval)).toBe(1_700_000_001_600_000n); + }); + + test("reconstructTimestamps returns all timestamps", () => { + const e = [ + { sampleIndex: 0, timestampMicroseconds: 1000n }, + { sampleIndex: 3, timestampMicroseconds: 5000n }, + ]; + const ts = reconstructTimestamps(5, e, 0n, 100n); + expect(ts).toEqual([1000n, 1100n, 1200n, 5000n, 5100n]); + }); + + test("reconstructTimestamps late start falls back for early samples", () => { + const e = [ + { sampleIndex: 3, timestampMicroseconds: 5000n }, + ]; + const ts = reconstructTimestamps(5, e, 1000n, 100n); + expect(ts).toEqual([1000n, 1100n, 1200n, 5000n, 5100n]); + }); +}); diff --git a/smartcontract/programs/doublezero-telemetry/src/entrypoint.rs b/smartcontract/programs/doublezero-telemetry/src/entrypoint.rs index 5716dcff58..d55f0ad5a1 100644 --- a/smartcontract/programs/doublezero-telemetry/src/entrypoint.rs +++ b/smartcontract/programs/doublezero-telemetry/src/entrypoint.rs @@ -3,6 +3,7 @@ use crate::{ processors::telemetry::{ initialize_device_latency_samples::process_initialize_device_latency_samples, initialize_internet_latency_samples::process_initialize_internet_latency_samples, + initialize_timestamp_index::process_initialize_timestamp_index, write_device_latency_samples::process_write_device_latency_samples, write_internet_latency_samples::process_write_internet_latency_samples, }, @@ -37,6 +38,9 @@ pub fn process_instruction( TelemetryInstruction::WriteInternetLatencySamples(args) => { process_write_internet_latency_samples(program_id, accounts, &args)? } + TelemetryInstruction::InitializeTimestampIndex => { + process_initialize_timestamp_index(program_id, accounts)? + } }; Ok(()) diff --git a/smartcontract/programs/doublezero-telemetry/src/error.rs b/smartcontract/programs/doublezero-telemetry/src/error.rs index c80e1f3787..52d1bf1480 100644 --- a/smartcontract/programs/doublezero-telemetry/src/error.rs +++ b/smartcontract/programs/doublezero-telemetry/src/error.rs @@ -37,6 +37,8 @@ pub enum TelemetryError { SameTargetAsOrigin = 1016, /// Write transaction contains no samples EmptyLatencySamples = 1017, + /// Timestamp index account does not exist + TimestampIndexAccountDoesNotExist = 1018, } impl From for ProgramError { @@ -78,6 +80,9 @@ impl fmt::Display for TelemetryError { Self::DataProviderNameTooLong => write!(f, "Data provider name exceeds 32 bytes"), Self::SameTargetAsOrigin => write!(f, "Origin and target are the same exchange"), Self::EmptyLatencySamples => write!(f, "Write transaction contains no samples"), + Self::TimestampIndexAccountDoesNotExist => { + write!(f, "Timestamp index account does not exist") + } } } } diff --git a/smartcontract/programs/doublezero-telemetry/src/instructions.rs b/smartcontract/programs/doublezero-telemetry/src/instructions.rs index 424e1e6d4e..a360973188 100644 --- a/smartcontract/programs/doublezero-telemetry/src/instructions.rs +++ b/smartcontract/programs/doublezero-telemetry/src/instructions.rs @@ -4,6 +4,7 @@ use crate::processors::telemetry::{ write_device_latency_samples::WriteDeviceLatencySamplesArgs, write_internet_latency_samples::WriteInternetLatencySamplesArgs, }; +// InitializeTimestampIndex has no args, so no import needed. use borsh::BorshSerialize; use solana_program::program_error::ProgramError; use std::cmp::PartialEq; @@ -18,12 +19,15 @@ pub enum TelemetryInstruction { InitializeInternetLatencySamples(InitializeInternetLatencySamplesArgs), /// Write internet latency samples to chain WriteInternetLatencySamples(WriteInternetLatencySamplesArgs), + /// Initialize a timestamp index companion account for a latency samples account + InitializeTimestampIndex, } pub const INITIALIZE_DEVICE_LATENCY_SAMPLES_INSTRUCTION_INDEX: u8 = 0; pub const WRITE_DEVICE_LATENCY_SAMPLES_INSTRUCTION_INDEX: u8 = 1; pub const INITIALIZE_INTERNET_LATENCY_SAMPLES_INSTRUCTION_INDEX: u8 = 2; pub const WRITE_INTERNET_LATENCY_SAMPLES_INSTRUCTION_INDEX: u8 = 3; +pub const INITIALIZE_TIMESTAMP_INDEX_INSTRUCTION_INDEX: u8 = 4; impl TelemetryInstruction { pub fn pack(&self) -> Result, ProgramError> { @@ -63,6 +67,9 @@ impl TelemetryInstruction { WriteInternetLatencySamplesArgs::try_from(rest)?, ) } + INITIALIZE_TIMESTAMP_INDEX_INSTRUCTION_INDEX => { + TelemetryInstruction::InitializeTimestampIndex + } _ => return Err(ProgramError::InvalidInstructionData), }; @@ -106,5 +113,6 @@ mod tests { samples: vec![], }, )); + test_instruction(TelemetryInstruction::InitializeTimestampIndex); } } diff --git a/smartcontract/programs/doublezero-telemetry/src/pda.rs b/smartcontract/programs/doublezero-telemetry/src/pda.rs index 199bbf76a3..e47ff43150 100644 --- a/smartcontract/programs/doublezero-telemetry/src/pda.rs +++ b/smartcontract/programs/doublezero-telemetry/src/pda.rs @@ -1,4 +1,6 @@ -use crate::seeds::{SEED_DEVICE_LATENCY_SAMPLES, SEED_INTERNET_LATENCY_SAMPLES, SEED_PREFIX}; +use crate::seeds::{ + SEED_DEVICE_LATENCY_SAMPLES, SEED_INTERNET_LATENCY_SAMPLES, SEED_PREFIX, SEED_TIMESTAMP_INDEX, +}; use solana_program::pubkey::Pubkey; /// Derive PDA for DZ latency samples account. @@ -22,6 +24,21 @@ pub fn derive_device_latency_samples_pda( ) } +/// Derive PDA for a timestamp index companion account. +pub fn derive_timestamp_index_pda( + program_id: &Pubkey, + samples_account_pk: &Pubkey, +) -> (Pubkey, u8) { + Pubkey::find_program_address( + &[ + SEED_PREFIX, + SEED_TIMESTAMP_INDEX, + samples_account_pk.as_ref(), + ], + program_id, + ) +} + /// Derive PDA for Internet latency samples account pub fn derive_internet_latency_samples_pda( program_id: &Pubkey, diff --git a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/initialize_timestamp_index.rs b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/initialize_timestamp_index.rs new file mode 100644 index 0000000000..f0815dc92e --- /dev/null +++ b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/initialize_timestamp_index.rs @@ -0,0 +1,103 @@ +use crate::{ + error::TelemetryError, + pda::derive_timestamp_index_pda, + seeds::{SEED_PREFIX, SEED_TIMESTAMP_INDEX}, + state::{ + accounttype::AccountType, + timestamp_index::{TimestampIndexHeader, TIMESTAMP_INDEX_HEADER_SIZE}, + }, +}; +use borsh::BorshSerialize; +use doublezero_program_common::create_account::try_create_account; +use solana_program::{ + account_info::{next_account_info, AccountInfo}, + entrypoint::ProgramResult, + msg, + program_error::ProgramError, + pubkey::Pubkey, +}; + +/// Initializes a new timestamp index companion account for a latency samples account. +/// +/// The timestamp index PDA is derived from the samples account's public key. +/// The samples account must exist and be owned by this program. +/// +/// Errors: +/// - `MissingRequiredSignature`: agent is not a signer +/// - `AccountDoesNotExist`: samples account does not exist +/// - `InvalidAccountOwner`: samples account is not owned by this program +/// - `InvalidPDA`: derived PDA does not match the provided account +/// - `AccountAlreadyExists`: timestamp index account already exists +pub fn process_initialize_timestamp_index( + program_id: &Pubkey, + accounts: &[AccountInfo], +) -> ProgramResult { + msg!("Processing InitializeTimestampIndex"); + + let accounts_iter = &mut accounts.iter(); + + let timestamp_index_account = next_account_info(accounts_iter)?; + let samples_account = next_account_info(accounts_iter)?; + let agent = next_account_info(accounts_iter)?; + let _system_program = next_account_info(accounts_iter)?; + + if !agent.is_signer { + return Err(ProgramError::MissingRequiredSignature); + } + + // The samples account must exist and be owned by this program. + if samples_account.data_is_empty() { + msg!("Samples account does not exist"); + return Err(TelemetryError::AccountDoesNotExist.into()); + } + + if samples_account.owner != program_id { + msg!("Samples account is not owned by this program"); + return Err(TelemetryError::InvalidAccountOwner.into()); + } + + // Derive and validate the PDA. + let (timestamp_index_pda, bump_seed) = + derive_timestamp_index_pda(program_id, samples_account.key); + + if *timestamp_index_account.key != timestamp_index_pda { + msg!("Invalid PDA for timestamp index account"); + return Err(TelemetryError::InvalidPDA.into()); + } + + if !timestamp_index_account.data_is_empty() { + msg!("Timestamp index account already exists"); + return Err(TelemetryError::AccountAlreadyExists.into()); + } + + let space = TIMESTAMP_INDEX_HEADER_SIZE; + + msg!("Creating timestamp index account: {}", timestamp_index_pda); + + try_create_account( + agent.key, + ×tamp_index_pda, + timestamp_index_account.lamports(), + space, + program_id, + accounts, + &[ + SEED_PREFIX, + SEED_TIMESTAMP_INDEX, + samples_account.key.as_ref(), + &[bump_seed], + ], + )?; + + let header = TimestampIndexHeader { + account_type: AccountType::TimestampIndex, + samples_account_pk: *samples_account.key, + next_entry_index: 0, + _unused: [0; 64], + }; + + let mut data = &mut timestamp_index_account.data.borrow_mut()[..]; + header.serialize(&mut data)?; + + Ok(()) +} diff --git a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/mod.rs b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/mod.rs index e6524891a7..6635f91591 100644 --- a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/mod.rs +++ b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/mod.rs @@ -1,4 +1,6 @@ pub mod initialize_device_latency_samples; pub mod initialize_internet_latency_samples; +pub mod initialize_timestamp_index; pub mod write_device_latency_samples; pub mod write_internet_latency_samples; +pub mod write_timestamp_index; diff --git a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_device_latency_samples.rs b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_device_latency_samples.rs index 1c99cc9996..827f064a97 100644 --- a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_device_latency_samples.rs +++ b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_device_latency_samples.rs @@ -1,5 +1,6 @@ use crate::{ error::TelemetryError, + processors::telemetry::write_timestamp_index::append_timestamp_index_entry, state::{ accounttype::AccountType, device_latency_samples::{ @@ -67,7 +68,7 @@ pub fn process_write_device_latency_samples( let accounts_iter = &mut accounts.iter(); - // Expected order: [latency_samples_account, agent, system_program] + // Expected order: [latency_samples_account, agent, system_program, timestamp_index_account (optional)] let latency_samples_account = next_account_info(accounts_iter)?; let agent = next_account_info(accounts_iter)?; @@ -163,5 +164,20 @@ pub fn process_write_device_latency_samples( ); } + // If a timestamp index account is provided (4th account after + // latency_samples, agent, system_program), append an entry. + if accounts.len() > 3 { + let timestamp_index_account = &accounts[3]; + append_timestamp_index_entry( + program_id, + timestamp_index_account, + latency_samples_account, + agent, + accounts, + write_index as u32, + args.start_timestamp_microseconds, + )?; + } + Ok(()) } diff --git a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_internet_latency_samples.rs b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_internet_latency_samples.rs index dee991a8a6..647fc9428a 100644 --- a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_internet_latency_samples.rs +++ b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_internet_latency_samples.rs @@ -1,5 +1,6 @@ use crate::{ error::TelemetryError, + processors::telemetry::write_timestamp_index::append_timestamp_index_entry, state::{ accounttype::AccountType, internet_latency_samples::{InternetLatencySamplesHeader, MAX_INTERNET_LATENCY_SAMPLES}, @@ -65,7 +66,7 @@ pub fn process_write_internet_latency_samples( let accounts_iter = &mut accounts.iter(); - // Expected order: [latency_samples_account, agent, system_program] + // Expected order: [latency_samples_account, agent, system_program, timestamp_index_account (optional)] let latency_samples_acct = next_account_info(accounts_iter)?; let collector_agent = next_account_info(accounts_iter)?; @@ -161,5 +162,20 @@ pub fn process_write_internet_latency_samples( ); } + // If a timestamp index account is provided (4th account after + // latency_samples, agent, system_program), append an entry. + if accounts.len() > 3 { + let timestamp_index_account = &accounts[3]; + append_timestamp_index_entry( + program_id, + timestamp_index_account, + latency_samples_acct, + collector_agent, + accounts, + write_index as u32, + args.start_timestamp_microseconds, + )?; + } + Ok(()) } diff --git a/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_timestamp_index.rs b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_timestamp_index.rs new file mode 100644 index 0000000000..35b0e8a03f --- /dev/null +++ b/smartcontract/programs/doublezero-telemetry/src/processors/telemetry/write_timestamp_index.rs @@ -0,0 +1,186 @@ +use crate::{ + error::TelemetryError, + pda::derive_timestamp_index_pda, + state::{ + accounttype::AccountType, + timestamp_index::{ + TimestampIndexHeader, MAX_TIMESTAMP_INDEX_ENTRIES, TIMESTAMP_INDEX_ENTRY_SIZE, + TIMESTAMP_INDEX_HEADER_SIZE, + }, + }, +}; +use borsh::BorshSerialize; +use doublezero_program_common::resize_account::resize_account_if_needed; +use solana_program::{ + account_info::AccountInfo, entrypoint::ProgramResult, msg, program_error::ProgramError, + pubkey::Pubkey, +}; + +/// Appends a timestamp index entry to a companion timestamp index account. +/// +/// Called from the write_device_latency_samples and write_internet_latency_samples +/// processors when a timestamp index account is provided. +pub fn append_timestamp_index_entry( + program_id: &Pubkey, + timestamp_index_account: &AccountInfo, + samples_account: &AccountInfo, + payer: &AccountInfo, + accounts: &[AccountInfo], + sample_index: u32, + timestamp_microseconds: u64, +) -> ProgramResult { + // Validate the timestamp index account exists and is owned by this program. + if timestamp_index_account.data_is_empty() { + msg!("Timestamp index account does not exist"); + return Err(TelemetryError::TimestampIndexAccountDoesNotExist.into()); + } + + if timestamp_index_account.owner != program_id { + return Err(TelemetryError::InvalidAccountOwner.into()); + } + + // Validate PDA derivation. + let (expected_pda, _) = derive_timestamp_index_pda(program_id, samples_account.key); + if *timestamp_index_account.key != expected_pda { + msg!("Timestamp index PDA does not match samples account"); + return Err(TelemetryError::InvalidPDA.into()); + } + + // Deserialize the header. + let mut header = TimestampIndexHeader::try_from( + ×tamp_index_account.try_borrow_data()?[..TIMESTAMP_INDEX_HEADER_SIZE], + ) + .map_err(|e| { + msg!("Failed to deserialize TimestampIndexHeader: {}", e); + ProgramError::InvalidAccountData + })?; + + if header.account_type != AccountType::TimestampIndex { + return Err(TelemetryError::InvalidAccountType.into()); + } + + if header.samples_account_pk != *samples_account.key { + msg!("Timestamp index samples_account_pk mismatch"); + return Err(TelemetryError::InvalidAccountOwner.into()); + } + + // Check capacity — silently skip the append if the index is full. + // The timestamp index is supplementary; hitting the cap should not + // block the parent write transaction. + if header.next_entry_index as usize >= MAX_TIMESTAMP_INDEX_ENTRIES { + msg!("Timestamp index is full, skipping append"); + return Ok(()); + } + + // Write the new entry. + let write_index = header.next_entry_index as usize; + header.next_entry_index += 1; + + let new_len = + TIMESTAMP_INDEX_HEADER_SIZE + header.next_entry_index as usize * TIMESTAMP_INDEX_ENTRY_SIZE; + resize_account_if_needed(timestamp_index_account, payer, accounts, new_len)?; + + { + let mut data = &mut timestamp_index_account.data.borrow_mut()[..]; + header.serialize(&mut data)?; + + let offset = write_index * TIMESTAMP_INDEX_ENTRY_SIZE; + data[offset..offset + 4].copy_from_slice(&sample_index.to_le_bytes()); + data[offset + 4..offset + 12].copy_from_slice(×tamp_microseconds.to_le_bytes()); + } + + msg!( + "Appended timestamp index entry: sample_index={}, timestamp={}", + sample_index, + timestamp_microseconds + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::state::timestamp_index::TimestampIndexHeader; + use solana_program::account_info::AccountInfo; + + #[test] + fn test_append_skips_when_full() { + let program_id = Pubkey::new_unique(); + let samples_key = Pubkey::new_unique(); + + let (ts_pda, _) = derive_timestamp_index_pda(&program_id, &samples_key); + + // Build a timestamp index header at max capacity. + let header = TimestampIndexHeader { + account_type: AccountType::TimestampIndex, + samples_account_pk: samples_key, + next_entry_index: MAX_TIMESTAMP_INDEX_ENTRIES as u32, + _unused: [0u8; 64], + }; + let mut ts_data = borsh::to_vec(&header).unwrap(); + // Pad with dummy entries so data_is_empty() returns false and + // the account data is consistent with the header. + ts_data.resize( + TIMESTAMP_INDEX_HEADER_SIZE + MAX_TIMESTAMP_INDEX_ENTRIES * TIMESTAMP_INDEX_ENTRY_SIZE, + 0, + ); + let ts_data_snapshot = ts_data.clone(); + + let mut ts_lamports = 1_000_000u64; + let ts_account = AccountInfo::new( + &ts_pda, + false, + true, + &mut ts_lamports, + &mut ts_data, + &program_id, + false, + 0, + ); + + let mut samples_lamports = 1_000_000u64; + let mut samples_data = vec![0u8; 1]; // non-empty placeholder + let samples_account = AccountInfo::new( + &samples_key, + false, + false, + &mut samples_lamports, + &mut samples_data, + &program_id, + false, + 0, + ); + + let payer_key = Pubkey::new_unique(); + let mut payer_lamports = 10_000_000u64; + let mut payer_data = vec![]; + let payer = AccountInfo::new( + &payer_key, + true, + true, + &mut payer_lamports, + &mut payer_data, + &solana_program::system_program::ID, + false, + 0, + ); + + let accounts = vec![ts_account.clone(), samples_account.clone(), payer.clone()]; + + // Appending should succeed (Ok) without modifying the account. + let result = append_timestamp_index_entry( + &program_id, + &ts_account, + &samples_account, + &payer, + &accounts, + 99999, // sample_index + 1_700_000_000_000_000, + ); + assert!(result.is_ok()); + + // Account data should be unchanged — no new entry was appended. + assert_eq!(*ts_account.data.borrow(), ts_data_snapshot); + } +} diff --git a/smartcontract/programs/doublezero-telemetry/src/seeds.rs b/smartcontract/programs/doublezero-telemetry/src/seeds.rs index 58fa3d8b83..b0c1c3e2df 100644 --- a/smartcontract/programs/doublezero-telemetry/src/seeds.rs +++ b/smartcontract/programs/doublezero-telemetry/src/seeds.rs @@ -1,3 +1,4 @@ pub const SEED_PREFIX: &[u8] = b"telemetry"; pub const SEED_DEVICE_LATENCY_SAMPLES: &[u8] = b"dzlatency"; pub const SEED_INTERNET_LATENCY_SAMPLES: &[u8] = b"inetlatency"; +pub const SEED_TIMESTAMP_INDEX: &[u8] = b"tsindex"; diff --git a/smartcontract/programs/doublezero-telemetry/src/state/accounttype.rs b/smartcontract/programs/doublezero-telemetry/src/state/accounttype.rs index db2c30be4c..36847da224 100644 --- a/smartcontract/programs/doublezero-telemetry/src/state/accounttype.rs +++ b/smartcontract/programs/doublezero-telemetry/src/state/accounttype.rs @@ -11,6 +11,7 @@ pub enum AccountType { InternetLatencySamplesV0 = 2, DeviceLatencySamples = 3, InternetLatencySamples = 4, + TimestampIndex = 5, } impl TryFrom for AccountType { @@ -22,6 +23,7 @@ impl TryFrom for AccountType { 2 => Ok(Self::InternetLatencySamplesV0), 3 => Ok(Self::DeviceLatencySamples), 4 => Ok(Self::InternetLatencySamples), + 5 => Ok(Self::TimestampIndex), _ => Err(ProgramError::InvalidAccountData), } } @@ -34,6 +36,7 @@ impl fmt::Display for AccountType { Self::InternetLatencySamplesV0 => write!(f, "InternetLatencySamplesV0"), Self::DeviceLatencySamples => write!(f, "DeviceLatencySamples"), Self::InternetLatencySamples => write!(f, "InternetLatencySamples"), + Self::TimestampIndex => write!(f, "TimestampIndex"), } } } diff --git a/smartcontract/programs/doublezero-telemetry/src/state/mod.rs b/smartcontract/programs/doublezero-telemetry/src/state/mod.rs index e6bd931d87..a17bd06e94 100644 --- a/smartcontract/programs/doublezero-telemetry/src/state/mod.rs +++ b/smartcontract/programs/doublezero-telemetry/src/state/mod.rs @@ -1,3 +1,4 @@ pub mod accounttype; pub mod device_latency_samples; pub mod internet_latency_samples; +pub mod timestamp_index; diff --git a/smartcontract/programs/doublezero-telemetry/src/state/timestamp_index.rs b/smartcontract/programs/doublezero-telemetry/src/state/timestamp_index.rs new file mode 100644 index 0000000000..de17137a8c --- /dev/null +++ b/smartcontract/programs/doublezero-telemetry/src/state/timestamp_index.rs @@ -0,0 +1,201 @@ +use crate::{ + seeds::SEED_TIMESTAMP_INDEX, + state::accounttype::{AccountType, AccountTypeInfo}, +}; +use borsh::{BorshDeserialize, BorshSerialize}; +use solana_program::pubkey::Pubkey; +use std::{ + fmt, + io::{self, Read, Write}, +}; + +/// Maximum number of timestamp index entries per account. +/// At one entry per write batch (up to 245 samples each), this comfortably +/// covers a 48-hour epoch. Once full, appends are silently skipped. +pub const MAX_TIMESTAMP_INDEX_ENTRIES: usize = 10_000; + +/// Header size in bytes: +/// - 1 byte: account_type +/// - 32 bytes: samples_account_pk +/// - 4 bytes: next_entry_index +/// - 64 bytes: _unused (reserved) +/// +/// Total: 101 bytes +pub const TIMESTAMP_INDEX_HEADER_SIZE: usize = { + 1 // account_type + + 32 // samples_account_pk + + 4 // next_entry_index + + 64 // _unused +}; + +/// Size of a single timestamp index entry in bytes: +/// - 4 bytes: sample_index (u32) +/// - 8 bytes: timestamp_microseconds (u64) +pub const TIMESTAMP_INDEX_ENTRY_SIZE: usize = 4 + 8; + +/// Onchain header for a timestamp index account. +#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct TimestampIndexHeader { + pub account_type: AccountType, // 1 + + #[cfg_attr( + feature = "serde", + serde( + serialize_with = "doublezero_program_common::serializer::serialize_pubkey_as_string", + deserialize_with = "doublezero_program_common::serializer::deserialize_pubkey_from_string" + ) + )] + pub samples_account_pk: Pubkey, // 32 + + pub next_entry_index: u32, // 4 + + #[cfg_attr(feature = "serde", serde(with = "serde_bytes"))] + pub _unused: [u8; 64], // 64 +} + +impl TryFrom<&[u8]> for TimestampIndexHeader { + type Error = borsh::io::Error; + + fn try_from(data: &[u8]) -> Result { + if data.len() < TIMESTAMP_INDEX_HEADER_SIZE { + return Err(borsh::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "account data too short for timestamp index header", + )); + } + + Self::deserialize(&mut &data[..]) + } +} + +/// A single entry in the timestamp index. +#[derive(BorshSerialize, BorshDeserialize, Debug, PartialEq, Clone, Copy)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct TimestampIndexEntry { + pub sample_index: u32, + pub timestamp_microseconds: u64, +} + +/// Structured representation of a timestamp index account. +#[derive(Debug, PartialEq, Clone)] +pub struct TimestampIndex { + pub header: TimestampIndexHeader, + pub entries: Vec, +} + +impl fmt::Display for TimestampIndex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "account_type: {}, samples_account: {}, entries: {}", + self.header.account_type, + self.header.samples_account_pk, + self.entries.len() + ) + } +} + +impl BorshSerialize for TimestampIndex { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.header.serialize(writer)?; + for entry in &self.entries { + writer.write_all(&entry.sample_index.to_le_bytes())?; + writer.write_all(&entry.timestamp_microseconds.to_le_bytes())?; + } + Ok(()) + } +} + +impl BorshDeserialize for TimestampIndex { + fn deserialize_reader(reader: &mut R) -> io::Result { + let header = TimestampIndexHeader::deserialize_reader(reader)?; + + let num_entries = header.next_entry_index as usize; + let mut entries = Vec::with_capacity(num_entries); + let mut sample_buf = [0u8; 4]; + let mut ts_buf = [0u8; 8]; + + for _ in 0..num_entries { + reader.read_exact(&mut sample_buf)?; + reader.read_exact(&mut ts_buf)?; + entries.push(TimestampIndexEntry { + sample_index: u32::from_le_bytes(sample_buf), + timestamp_microseconds: u64::from_le_bytes(ts_buf), + }); + } + + Ok(TimestampIndex { header, entries }) + } +} + +impl TryFrom<&[u8]> for TimestampIndex { + type Error = borsh::io::Error; + + fn try_from(data: &[u8]) -> Result { + borsh::from_slice(data) + } +} + +impl AccountTypeInfo for TimestampIndex { + fn seed(&self) -> &[u8] { + SEED_TIMESTAMP_INDEX + } + + fn size(&self) -> usize { + TIMESTAMP_INDEX_HEADER_SIZE + self.entries.len() * TIMESTAMP_INDEX_ENTRY_SIZE + } + + fn owner(&self) -> Pubkey { + // The timestamp index doesn't have a single "owner" agent in its header, + // but we return the samples account pk for reference. + self.header.samples_account_pk + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_timestamp_index_serialization() { + let entries = vec![ + TimestampIndexEntry { + sample_index: 0, + timestamp_microseconds: 1_700_000_000_000_000, + }, + TimestampIndexEntry { + sample_index: 12, + timestamp_microseconds: 1_700_000_000_060_000, + }, + TimestampIndexEntry { + sample_index: 24, + timestamp_microseconds: 1_700_000_000_120_000, + }, + ]; + let val = TimestampIndex { + header: TimestampIndexHeader { + account_type: AccountType::TimestampIndex, + samples_account_pk: Pubkey::new_unique(), + next_entry_index: entries.len() as u32, + _unused: [0; 64], + }, + entries: entries.clone(), + }; + let header = val.header.clone(); + + let data = borsh::to_vec(&val).unwrap(); + let val2 = TimestampIndex::try_from_slice(&data).unwrap(); + let header2 = val2.header.clone(); + + assert_eq!(header.account_type, header2.account_type); + assert_eq!(header.samples_account_pk, header2.samples_account_pk); + assert_eq!(header.next_entry_index, header2.next_entry_index); + assert_eq!(val.entries, val2.entries); + assert_eq!( + data.len(), + borsh::object_length(&val).unwrap(), + "Invalid size" + ); + } +} diff --git a/smartcontract/programs/doublezero-telemetry/tests/test_helpers.rs b/smartcontract/programs/doublezero-telemetry/tests/test_helpers.rs index 84589469d0..ae5b89f107 100644 --- a/smartcontract/programs/doublezero-telemetry/tests/test_helpers.rs +++ b/smartcontract/programs/doublezero-telemetry/tests/test_helpers.rs @@ -37,7 +37,10 @@ use doublezero_serviceability::{ use doublezero_telemetry::{ error::TelemetryError, instructions::{TelemetryInstruction, INITIALIZE_DEVICE_LATENCY_SAMPLES_INSTRUCTION_INDEX}, - pda::{derive_device_latency_samples_pda, derive_internet_latency_samples_pda}, + pda::{ + derive_device_latency_samples_pda, derive_internet_latency_samples_pda, + derive_timestamp_index_pda, + }, processors::telemetry::{ initialize_device_latency_samples::InitializeDeviceLatencySamplesArgs, initialize_internet_latency_samples::InitializeInternetLatencySamplesArgs, @@ -712,6 +715,76 @@ impl TelemetryProgramHelper { banks_client.process_transaction(tx).await } + pub async fn initialize_timestamp_index( + &mut self, + agent: &Keypair, + samples_account_pk: Pubkey, + ) -> Result { + let (pda, _) = derive_timestamp_index_pda(&self.program_id, &samples_account_pk); + + self.execute_transaction( + TelemetryInstruction::InitializeTimestampIndex, + &[agent], + vec![ + AccountMeta::new(pda, false), + AccountMeta::new_readonly(samples_account_pk, false), + AccountMeta::new(agent.pubkey(), true), + AccountMeta::new_readonly(solana_system_interface::program::ID, false), + ], + ) + .await?; + + Ok(pda) + } + + pub async fn write_device_latency_samples_with_timestamp_index( + &mut self, + agent: &Keypair, + latency_samples_pda: Pubkey, + timestamp_index_pda: Pubkey, + samples: Vec, + start_timestamp_microseconds: u64, + ) -> Result<(), BanksClientError> { + self.execute_transaction( + TelemetryInstruction::WriteDeviceLatencySamples(WriteDeviceLatencySamplesArgs { + start_timestamp_microseconds, + samples, + }), + &[agent], + vec![ + AccountMeta::new(latency_samples_pda, false), + AccountMeta::new(agent.pubkey(), true), + AccountMeta::new_readonly(solana_system_interface::program::ID, false), + AccountMeta::new(timestamp_index_pda, false), + ], + ) + .await + } + + pub async fn write_internet_latency_samples_with_timestamp_index( + &mut self, + agent: &Keypair, + latency_samples_pda: Pubkey, + timestamp_index_pda: Pubkey, + samples: Vec, + start_timestamp_microseconds: u64, + ) -> Result<(), BanksClientError> { + self.execute_transaction( + TelemetryInstruction::WriteInternetLatencySamples(WriteInternetLatencySamplesArgs { + start_timestamp_microseconds, + samples, + }), + &[agent], + vec![ + AccountMeta::new(latency_samples_pda, false), + AccountMeta::new(agent.pubkey(), true), + AccountMeta::new_readonly(solana_system_interface::program::ID, false), + AccountMeta::new(timestamp_index_pda, false), + ], + ) + .await + } + pub async fn execute_transaction( &mut self, instruction: TelemetryInstruction, diff --git a/smartcontract/programs/doublezero-telemetry/tests/timestamp_index_tests.rs b/smartcontract/programs/doublezero-telemetry/tests/timestamp_index_tests.rs new file mode 100644 index 0000000000..489c9f9977 --- /dev/null +++ b/smartcontract/programs/doublezero-telemetry/tests/timestamp_index_tests.rs @@ -0,0 +1,343 @@ +use doublezero_telemetry::{ + error::TelemetryError, + instructions::TelemetryInstruction, + pda::derive_timestamp_index_pda, + state::{ + accounttype::AccountType, + device_latency_samples::DeviceLatencySamples, + timestamp_index::{TimestampIndex, TIMESTAMP_INDEX_HEADER_SIZE}, + }, +}; +use solana_program::instruction::InstructionError; +use solana_program_test::*; +use solana_sdk::{ + instruction::AccountMeta, + pubkey::Pubkey, + signature::{Keypair, Signer}, + transaction::TransactionError, +}; + +mod test_helpers; + +use test_helpers::*; + +#[tokio::test] +async fn test_initialize_timestamp_index_success() { + let mut ledger = LedgerHelper::new().await.unwrap(); + + let payer_pubkey = ledger + .context + .lock() + .unwrap() + .payer + .insecure_clone() + .pubkey(); + let contributor_pk = ledger + .serviceability + .create_contributor("CONTRIB".to_string(), payer_pubkey) + .await + .unwrap(); + + let (agent, origin_device_pk, target_device_pk, link_pk) = ledger + .seed_with_two_linked_devices(contributor_pk) + .await + .unwrap(); + + ledger.wait_for_new_blockhash().await.unwrap(); + + let latency_samples_pda = ledger + .telemetry + .initialize_device_latency_samples( + &agent, + origin_device_pk, + target_device_pk, + link_pk, + 1u64, + 5_000_000, + ) + .await + .unwrap(); + + let timestamp_index_pda = ledger + .telemetry + .initialize_timestamp_index(&agent, latency_samples_pda) + .await + .unwrap(); + + // Verify the timestamp index account was created. + let account = ledger + .get_account(timestamp_index_pda) + .await + .unwrap() + .expect("Timestamp index account does not exist"); + + assert_eq!(account.owner, ledger.telemetry.program_id); + assert_eq!(account.data.len(), TIMESTAMP_INDEX_HEADER_SIZE); + + let ts_index = TimestampIndex::try_from(&account.data[..]).unwrap(); + assert_eq!(ts_index.header.account_type, AccountType::TimestampIndex); + assert_eq!(ts_index.header.samples_account_pk, latency_samples_pda); + assert_eq!(ts_index.header.next_entry_index, 0); + assert!(ts_index.entries.is_empty()); +} + +#[tokio::test] +async fn test_initialize_timestamp_index_fail_already_exists() { + let mut ledger = LedgerHelper::new().await.unwrap(); + + let payer_pubkey = ledger + .context + .lock() + .unwrap() + .payer + .insecure_clone() + .pubkey(); + let contributor_pk = ledger + .serviceability + .create_contributor("CONTRIB".to_string(), payer_pubkey) + .await + .unwrap(); + + let (agent, origin_device_pk, target_device_pk, link_pk) = ledger + .seed_with_two_linked_devices(contributor_pk) + .await + .unwrap(); + + ledger.wait_for_new_blockhash().await.unwrap(); + + let latency_samples_pda = ledger + .telemetry + .initialize_device_latency_samples( + &agent, + origin_device_pk, + target_device_pk, + link_pk, + 1u64, + 5_000_000, + ) + .await + .unwrap(); + + ledger + .telemetry + .initialize_timestamp_index(&agent, latency_samples_pda) + .await + .unwrap(); + + ledger.wait_for_new_blockhash().await.unwrap(); + + // Try to initialize again — should fail. + let result = ledger + .telemetry + .initialize_timestamp_index(&agent, latency_samples_pda) + .await; + + let err = result.unwrap_err(); + match err { + BanksClientError::TransactionError(TransactionError::InstructionError( + _, + InstructionError::Custom(code), + )) => { + assert_eq!(code, TelemetryError::AccountAlreadyExists as u32); + } + other => panic!("Unexpected error: {other:?}"), + } +} + +#[tokio::test] +async fn test_write_device_latency_samples_with_timestamp_index() { + let mut ledger = LedgerHelper::new().await.unwrap(); + + let payer_pubkey = ledger + .context + .lock() + .unwrap() + .payer + .insecure_clone() + .pubkey(); + let contributor_pk = ledger + .serviceability + .create_contributor("CONTRIB".to_string(), payer_pubkey) + .await + .unwrap(); + + let (agent, origin_device_pk, target_device_pk, link_pk) = ledger + .seed_with_two_linked_devices(contributor_pk) + .await + .unwrap(); + + ledger.wait_for_new_blockhash().await.unwrap(); + + let latency_samples_pda = ledger + .telemetry + .initialize_device_latency_samples( + &agent, + origin_device_pk, + target_device_pk, + link_pk, + 1u64, + 5_000_000, + ) + .await + .unwrap(); + + let timestamp_index_pda = ledger + .telemetry + .initialize_timestamp_index(&agent, latency_samples_pda) + .await + .unwrap(); + + // Write first batch. + let t1 = 1_700_000_000_000_000; + ledger + .telemetry + .write_device_latency_samples_with_timestamp_index( + &agent, + latency_samples_pda, + timestamp_index_pda, + vec![1000, 1100, 1200], + t1, + ) + .await + .unwrap(); + + // Write second batch. + let t2 = 1_700_000_000_015_000; + ledger + .telemetry + .write_device_latency_samples_with_timestamp_index( + &agent, + latency_samples_pda, + timestamp_index_pda, + vec![1300, 1400], + t2, + ) + .await + .unwrap(); + + // Verify samples were written correctly. + let samples_account = ledger + .get_account(latency_samples_pda) + .await + .unwrap() + .expect("Samples account does not exist"); + let samples_data = DeviceLatencySamples::try_from(&samples_account.data[..]).unwrap(); + assert_eq!(samples_data.header.next_sample_index, 5); + assert_eq!(samples_data.samples, vec![1000, 1100, 1200, 1300, 1400]); + + // Verify timestamp index entries. + let ts_account = ledger + .get_account(timestamp_index_pda) + .await + .unwrap() + .expect("Timestamp index account does not exist"); + let ts_index = TimestampIndex::try_from(&ts_account.data[..]).unwrap(); + assert_eq!(ts_index.header.next_entry_index, 2); + assert_eq!(ts_index.entries.len(), 2); + + // First entry: sample_index=0, timestamp=t1 + assert_eq!(ts_index.entries[0].sample_index, 0); + assert_eq!(ts_index.entries[0].timestamp_microseconds, t1); + + // Second entry: sample_index=3, timestamp=t2 + assert_eq!(ts_index.entries[1].sample_index, 3); + assert_eq!(ts_index.entries[1].timestamp_microseconds, t2); +} + +#[tokio::test] +async fn test_write_device_latency_samples_without_timestamp_index_still_works() { + let mut ledger = LedgerHelper::new().await.unwrap(); + + let payer_pubkey = ledger + .context + .lock() + .unwrap() + .payer + .insecure_clone() + .pubkey(); + let contributor_pk = ledger + .serviceability + .create_contributor("CONTRIB".to_string(), payer_pubkey) + .await + .unwrap(); + + let (agent, origin_device_pk, target_device_pk, link_pk) = ledger + .seed_with_two_linked_devices(contributor_pk) + .await + .unwrap(); + + ledger.wait_for_new_blockhash().await.unwrap(); + + let latency_samples_pda = ledger + .telemetry + .initialize_device_latency_samples( + &agent, + origin_device_pk, + target_device_pk, + link_pk, + 1u64, + 5_000_000, + ) + .await + .unwrap(); + + // Write without timestamp index — should succeed as before. + ledger + .telemetry + .write_device_latency_samples( + &agent, + latency_samples_pda, + vec![1000, 1100], + 1_700_000_000_000_000, + ) + .await + .unwrap(); + + let account = ledger + .get_account(latency_samples_pda) + .await + .unwrap() + .expect("Samples account does not exist"); + let samples_data = DeviceLatencySamples::try_from(&account.data[..]).unwrap(); + assert_eq!(samples_data.header.next_sample_index, 2); + assert_eq!(samples_data.samples, vec![1000, 1100]); +} + +#[tokio::test] +async fn test_initialize_timestamp_index_fail_samples_account_does_not_exist() { + let mut ledger = LedgerHelper::new().await.unwrap(); + + let agent = Keypair::new(); + ledger + .fund_account(&agent.pubkey(), 10_000_000_000) + .await + .unwrap(); + + let fake_samples_pk = Pubkey::new_unique(); + let (ts_pda, _) = derive_timestamp_index_pda(&ledger.telemetry.program_id, &fake_samples_pk); + + let result = ledger + .telemetry + .execute_transaction( + TelemetryInstruction::InitializeTimestampIndex, + &[&agent], + vec![ + AccountMeta::new(ts_pda, false), + AccountMeta::new_readonly(fake_samples_pk, false), + AccountMeta::new(agent.pubkey(), true), + AccountMeta::new_readonly(solana_system_interface::program::ID, false), + ], + ) + .await; + + let err = result.unwrap_err(); + match err { + BanksClientError::TransactionError(TransactionError::InstructionError( + _, + InstructionError::Custom(code), + )) => { + assert_eq!(code, TelemetryError::AccountDoesNotExist as u32); + } + other => panic!("Unexpected error: {other:?}"), + } +} diff --git a/smartcontract/sdk/go/telemetry/client.go b/smartcontract/sdk/go/telemetry/client.go index c70866c8b2..0a6c23d286 100644 --- a/smartcontract/sdk/go/telemetry/client.go +++ b/smartcontract/sdk/go/telemetry/client.go @@ -16,9 +16,10 @@ import ( ) var ( - ErrAccountNotFound = errors.New("account not found") - ErrSamplesBatchTooLarge = fmt.Errorf("samples batch too large, must not exceed %d samples", MaxSamplesPerBatch) - ErrSamplesAccountFull = errors.New("samples account is full") + ErrAccountNotFound = errors.New("account not found") + ErrSamplesBatchTooLarge = fmt.Errorf("samples batch too large, must not exceed %d samples", MaxSamplesPerBatch) + ErrSamplesAccountFull = errors.New("samples account is full") + ErrTimestampIndexNotFound = errors.New("timestamp index account not found") ) type Client struct { @@ -225,6 +226,29 @@ func (c *Client) InitializeDeviceLatencySamples( return sig, res, nil } +func (c *Client) InitializeTimestampIndex( + ctx context.Context, + samplesAccountPK solana.PublicKey, +) (solana.Signature, *solanarpc.GetTransactionResult, error) { + instruction, err := BuildInitializeTimestampIndexInstruction( + c.executor.programID, + c.executor.signer.PublicKey(), + samplesAccountPK, + ) + if err != nil { + return solana.Signature{}, nil, fmt.Errorf("failed to build instruction: %w", err) + } + + sig, res, err := c.executor.ExecuteTransaction(ctx, instruction, &ExecuteTransactionOptions{ + SkipPreflight: true, + }) + if err != nil { + return solana.Signature{}, nil, fmt.Errorf("failed to execute instruction: %w", err) + } + + return sig, res, nil +} + func (c *Client) WriteDeviceLatencySamples( ctx context.Context, config WriteDeviceLatencySamplesInstructionConfig, @@ -258,6 +282,8 @@ func (c *Client) WriteDeviceLatencySamples( return solana.Signature{}, nil, ErrAccountNotFound case strconv.Itoa(InstructionErrorAccountSamplesAccountFull): return solana.Signature{}, nil, ErrSamplesAccountFull + case strconv.Itoa(InstructionErrorTimestampIndexAccountDoesNotExist): + return solana.Signature{}, nil, ErrTimestampIndexNotFound } } } @@ -364,6 +390,8 @@ func (c *Client) WriteInternetLatencySamples( return solana.Signature{}, nil, ErrAccountNotFound case strconv.Itoa(InstructionErrorAccountSamplesAccountFull): return solana.Signature{}, nil, ErrSamplesAccountFull + case strconv.Itoa(InstructionErrorTimestampIndexAccountDoesNotExist): + return solana.Signature{}, nil, ErrTimestampIndexNotFound } } } diff --git a/smartcontract/sdk/go/telemetry/client_device_test.go b/smartcontract/sdk/go/telemetry/client_device_test.go index 75616eed9b..4357136cff 100644 --- a/smartcontract/sdk/go/telemetry/client_device_test.go +++ b/smartcontract/sdk/go/telemetry/client_device_test.go @@ -849,6 +849,66 @@ func TestSDK_Telemetry_Client_WriteDeviceLatencySamples_CustomInstructionErrorAc require.Nil(t, tx) } +func TestSDK_Telemetry_Client_WriteDeviceLatencySamples_CustomInstructionErrorTimestampIndexNotFound(t *testing.T) { + t.Parallel() + + signer := solana.NewWallet().PrivateKey + programID := solana.NewWallet().PublicKey() + + customErr := &jsonrpc.RPCError{ + Code: -32000, + Message: "Transaction simulation failed", + Data: map[string]any{ + "err": map[string]any{ + "InstructionError": []any{ + 0, + map[string]any{ + "Custom": json.Number(strconv.Itoa(telemetry.InstructionErrorTimestampIndexAccountDoesNotExist)), + }, + }, + }, + }, + } + + mockRPC := &mockRPCClient{ + GetLatestBlockhashFunc: func(_ context.Context, _ solanarpc.CommitmentType) (*solanarpc.GetLatestBlockhashResult, error) { + return &solanarpc.GetLatestBlockhashResult{ + Value: &solanarpc.LatestBlockhashResult{ + Blockhash: solana.MustHashFromBase58("5NzX7jrPWeTkGsDnVnszdEa7T3Yyr3nSgyc78z3CwjWQ"), + }, + }, nil + }, + SendTransactionWithOptsFunc: func(_ context.Context, _ *solana.Transaction, _ solanarpc.TransactionOpts) (solana.Signature, error) { + return solana.Signature{}, customErr + }, + GetSignatureStatusesFunc: func(_ context.Context, _ bool, _ ...solana.Signature) (*solanarpc.GetSignatureStatusesResult, error) { + return nil, nil + }, + GetTransactionFunc: func(_ context.Context, _ solana.Signature, _ *solanarpc.GetTransactionOpts) (*solanarpc.GetTransactionResult, error) { + return nil, nil + }, + } + + client := telemetry.New(slog.Default(), mockRPC, &signer, programID) + + epoch := uint64(42) + config := telemetry.WriteDeviceLatencySamplesInstructionConfig{ + AgentPK: signer.PublicKey(), + OriginDevicePK: solana.NewWallet().PublicKey(), + TargetDevicePK: solana.NewWallet().PublicKey(), + LinkPK: solana.NewWallet().PublicKey(), + Epoch: &epoch, + StartTimestampMicroseconds: 1_600_000_000, + Samples: []uint32{1, 2, 3}, + } + + sig, tx, err := client.WriteDeviceLatencySamples(context.Background(), config) + + require.ErrorIs(t, err, telemetry.ErrTimestampIndexNotFound) + require.Equal(t, solana.Signature{}, sig) + require.Nil(t, tx) +} + func TestSDK_Telemetry_Client_WriteDeviceLatencySamples_BuildFails(t *testing.T) { t.Parallel() diff --git a/smartcontract/sdk/go/telemetry/client_internet_test.go b/smartcontract/sdk/go/telemetry/client_internet_test.go index 140459cea2..3b6de94b61 100644 --- a/smartcontract/sdk/go/telemetry/client_internet_test.go +++ b/smartcontract/sdk/go/telemetry/client_internet_test.go @@ -441,6 +441,64 @@ func TestSDK_Telemetry_Client_WriteInternetLatencySamples_CustomInstructionError require.Nil(t, tx) } +func TestSDK_Telemetry_Client_WriteInternetLatencySamples_CustomInstructionErrorTimestampIndexNotFound(t *testing.T) { + t.Parallel() + + signer := solana.NewWallet().PrivateKey + programID := solana.NewWallet().PublicKey() + + customErr := &jsonrpc.RPCError{ + Code: -32000, + Message: "Transaction simulation failed", + Data: map[string]any{ + "err": map[string]any{ + "InstructionError": []any{ + 0, + map[string]any{ + "Custom": json.Number(strconv.Itoa(telemetry.InstructionErrorTimestampIndexAccountDoesNotExist)), + }, + }, + }, + }, + } + + mockRPC := &mockRPCClient{ + GetLatestBlockhashFunc: func(_ context.Context, _ solanarpc.CommitmentType) (*solanarpc.GetLatestBlockhashResult, error) { + return &solanarpc.GetLatestBlockhashResult{ + Value: &solanarpc.LatestBlockhashResult{ + Blockhash: solana.MustHashFromBase58("5NzX7jrPWeTkGsDnVnszdEa7T3Yyr3nSgyc78z3CwjWQ"), + }, + }, nil + }, + SendTransactionWithOptsFunc: func(_ context.Context, _ *solana.Transaction, _ solanarpc.TransactionOpts) (solana.Signature, error) { + return solana.Signature{}, customErr + }, + GetSignatureStatusesFunc: func(_ context.Context, _ bool, _ ...solana.Signature) (*solanarpc.GetSignatureStatusesResult, error) { + return nil, nil + }, + GetTransactionFunc: func(_ context.Context, _ solana.Signature, _ *solanarpc.GetTransactionOpts) (*solanarpc.GetTransactionResult, error) { + return nil, nil + }, + } + + client := telemetry.New(slog.Default(), mockRPC, &signer, programID) + + config := telemetry.WriteInternetLatencySamplesInstructionConfig{ + OriginExchangePK: solana.NewWallet().PublicKey(), + TargetExchangePK: solana.NewWallet().PublicKey(), + DataProviderName: "test-data-provider-1", + Epoch: 42, + StartTimestampMicroseconds: 1_600_000_000, + Samples: []uint32{1, 2, 3}, + } + + sig, tx, err := client.WriteInternetLatencySamples(context.Background(), config) + + require.ErrorIs(t, err, telemetry.ErrTimestampIndexNotFound) + require.Equal(t, solana.Signature{}, sig) + require.Nil(t, tx) +} + func TestSDK_Telemetry_Client_WriteInternetLatencySamples_BuildFails(t *testing.T) { t.Parallel() diff --git a/smartcontract/sdk/go/telemetry/constants.go b/smartcontract/sdk/go/telemetry/constants.go index bda07ed326..88646aeb9e 100644 --- a/smartcontract/sdk/go/telemetry/constants.go +++ b/smartcontract/sdk/go/telemetry/constants.go @@ -12,6 +12,8 @@ const ( InitializeInternetLatencySamplesInstructionIndex TelemetryInstructionType = 2 // Represents the write internet latency samples instruction WriteInternetLatencySamplesInstructionIndex TelemetryInstructionType = 3 + // Represents the initialize timestamp index instruction + InitializeTimestampIndexInstructionIndex TelemetryInstructionType = 4 // InstructionErrorAccountSamplesAccountFull is the error code that the telemetry program returns // when the given PDA has reached maximum capacity for samples. @@ -21,6 +23,10 @@ const ( // when the given PDA does not exist. InstructionErrorAccountDoesNotExist = 1011 + // InstructionErrorTimestampIndexAccountDoesNotExist is the error code that the telemetry + // program returns when the timestamp index account does not exist. + InstructionErrorTimestampIndexAccountDoesNotExist = 1018 + // MaxSamplesPerBatch is the maximum number of samples that can be written in a single batch. // // Messages transmitted to Solana validators must not exceed the IPv6 MTU size to ensure fast @@ -58,4 +64,6 @@ const ( DeviceLatencySamplesSeed = "dzlatency" // Seed for internet latency samples PDAs InternetLatencySamplesSeed = "inetlatency" + // Seed for timestamp index PDAs + TimestampIndexSeed = "tsindex" ) diff --git a/smartcontract/sdk/go/telemetry/initialize_timestamp_index.go b/smartcontract/sdk/go/telemetry/initialize_timestamp_index.go new file mode 100644 index 0000000000..fb5dbb9519 --- /dev/null +++ b/smartcontract/sdk/go/telemetry/initialize_timestamp_index.go @@ -0,0 +1,50 @@ +package telemetry + +import ( + "fmt" + + "github.com/gagliardetto/solana-go" + "github.com/near/borsh-go" +) + +// BuildInitializeTimestampIndexInstruction builds the instruction for initializing +// a timestamp index companion account for a given samples account. +func BuildInitializeTimestampIndexInstruction( + programID solana.PublicKey, + agentPK solana.PublicKey, + samplesAccountPK solana.PublicKey, +) (solana.Instruction, error) { + if agentPK.IsZero() { + return nil, fmt.Errorf("agent public key is required") + } + if samplesAccountPK.IsZero() { + return nil, fmt.Errorf("samples account public key is required") + } + + data, err := borsh.Serialize(struct { + Discriminator uint8 + }{ + Discriminator: uint8(InitializeTimestampIndexInstructionIndex), + }) + if err != nil { + return nil, fmt.Errorf("failed to serialize args: %w", err) + } + + timestampIndexPDA, _, err := DeriveTimestampIndexPDA(programID, samplesAccountPK) + if err != nil { + return nil, fmt.Errorf("failed to derive timestamp index PDA: %w", err) + } + + accounts := []*solana.AccountMeta{ + {PublicKey: timestampIndexPDA, IsSigner: false, IsWritable: true}, + {PublicKey: samplesAccountPK, IsSigner: false, IsWritable: false}, + {PublicKey: agentPK, IsSigner: true, IsWritable: true}, + {PublicKey: solana.SystemProgramID, IsSigner: false, IsWritable: false}, + } + + return &solana.GenericInstruction{ + ProgID: programID, + AccountValues: accounts, + DataBytes: data, + }, nil +} diff --git a/smartcontract/sdk/go/telemetry/pda.go b/smartcontract/sdk/go/telemetry/pda.go index efd64d49f5..065f0d4f35 100644 --- a/smartcontract/sdk/go/telemetry/pda.go +++ b/smartcontract/sdk/go/telemetry/pda.go @@ -32,6 +32,19 @@ func DeriveDeviceLatencySamplesPDA( return solana.FindProgramAddress(seeds, programID) } +// DeriveTimestampIndexPDA derives the PDA for a timestamp index companion account. +func DeriveTimestampIndexPDA( + programID solana.PublicKey, + samplesAccountPK solana.PublicKey, +) (solana.PublicKey, uint8, error) { + seeds := [][]byte{ + []byte(TelemetrySeedPrefix), + []byte(TimestampIndexSeed), + samplesAccountPK[:], + } + return solana.FindProgramAddress(seeds, programID) +} + // Derives the PDA for internet latency samples account func DeriveInternetLatencySamplesPDA( programID solana.PublicKey, diff --git a/smartcontract/sdk/go/telemetry/write_device_latency_samples.go b/smartcontract/sdk/go/telemetry/write_device_latency_samples.go index 4aa4894953..de6355bb1d 100644 --- a/smartcontract/sdk/go/telemetry/write_device_latency_samples.go +++ b/smartcontract/sdk/go/telemetry/write_device_latency_samples.go @@ -15,6 +15,7 @@ type WriteDeviceLatencySamplesInstructionConfig struct { Epoch *uint64 StartTimestampMicroseconds uint64 Samples []uint32 + TimestampIndexPK *solana.PublicKey // optional: if set, appends a timestamp index entry } func (c *WriteDeviceLatencySamplesInstructionConfig) Validate() error { @@ -78,6 +79,11 @@ func BuildWriteDeviceLatencySamplesInstruction( {PublicKey: config.AgentPK, IsSigner: true, IsWritable: false}, {PublicKey: solana.SystemProgramID, IsSigner: false, IsWritable: false}, } + if config.TimestampIndexPK != nil { + accounts = append(accounts, &solana.AccountMeta{ + PublicKey: *config.TimestampIndexPK, IsSigner: false, IsWritable: true, + }) + } return &solana.GenericInstruction{ ProgID: programID, diff --git a/smartcontract/sdk/go/telemetry/write_internet_latency_samples.go b/smartcontract/sdk/go/telemetry/write_internet_latency_samples.go index 9b6a4162de..3d4266407f 100644 --- a/smartcontract/sdk/go/telemetry/write_internet_latency_samples.go +++ b/smartcontract/sdk/go/telemetry/write_internet_latency_samples.go @@ -14,6 +14,7 @@ type WriteInternetLatencySamplesInstructionConfig struct { Epoch uint64 StartTimestampMicroseconds uint64 Samples []uint32 + TimestampIndexPK *solana.PublicKey // optional: if set, appends a timestamp index entry } func (c *WriteInternetLatencySamplesInstructionConfig) Validate() error { @@ -75,6 +76,11 @@ func BuildWriteInternetLatencySamplesInstruction( {PublicKey: signerPK, IsSigner: true, IsWritable: false}, {PublicKey: solana.SystemProgramID, IsSigner: false, IsWritable: false}, } + if config.TimestampIndexPK != nil { + accounts = append(accounts, &solana.AccountMeta{ + PublicKey: *config.TimestampIndexPK, IsSigner: false, IsWritable: true, + }) + } return &solana.GenericInstruction{ ProgID: programID,