Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions e2e/internal/qa/client_settlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,63 @@ func (c *Client) IsSeatProratingEnabled(ctx context.Context) (bool, error) {
return cfg.IsProratedServiceEnabled(), nil
}

// IsProgramPaused returns true if the shred-subscription program config has
// the paused flag set. While paused, the oracle cannot ack instant seat
// allocation requests, which leaves the seat un-withdrawable.
func (c *Client) IsProgramPaused(ctx context.Context) (bool, error) {
programID, err := solana.PublicKeyFromBase58(c.ShredSubscriptionProgramID)
if err != nil {
return false, fmt.Errorf("failed to parse shred subscription program ID %q: %w", c.ShredSubscriptionProgramID, err)
}

cfg, err := shreds.New(shreds.NewRPCClient(c.SolanaRPCURL), programID).FetchProgramConfig(ctx)
if err != nil {
return false, fmt.Errorf("failed to fetch program config on host %s: %w", c.Host, err)
}
return cfg.IsPaused(), nil
}

// WaitForSeatAllocationAcked polls the onchain client seat until the pending
// instant-allocation flag clears (the oracle has acked the request). Withdraw
// is blocked onchain while the flag is set, so callers should wait between
// FeedSeatPay and FeedSeatWithdraw. On timeout the error includes the current
// program-paused state to surface migration windows as the likely cause.
func (c *Client) WaitForSeatAllocationAcked(ctx context.Context, devicePubkey string, timeout time.Duration) error {
deviceKey, err := solana.PublicKeyFromBase58(devicePubkey)
if err != nil {
return fmt.Errorf("failed to parse device pubkey %q: %w", devicePubkey, err)
}
programID, err := solana.PublicKeyFromBase58(c.ShredSubscriptionProgramID)
if err != nil {
return fmt.Errorf("failed to parse shred subscription program ID %q: %w", c.ShredSubscriptionProgramID, err)
}
clientIPBits := binary.BigEndian.Uint32(c.publicIP.To4())
shredsClient := shreds.New(shreds.NewRPCClient(c.SolanaRPCURL), programID)

c.log.Debug("Waiting for seat allocation ack", "host", c.Host, "device", devicePubkey, "timeout", timeout)
deadline := time.Now().Add(timeout)
var lastErr error
for {
seat, err := shredsClient.FetchClientSeat(ctx, deviceKey, clientIPBits)
if err == nil && !seat.HasPendingInstantRequest() {
c.log.Debug("Seat allocation acked", "host", c.Host, "device", devicePubkey)
return nil
}
lastErr = err
if time.Now().After(deadline) {
cfg, cfgErr := shredsClient.FetchProgramConfig(ctx)
paused := cfgErr == nil && cfg.IsPaused()
return fmt.Errorf("seat allocation not acked within %s on host %s (program_paused=%t, last_fetch_err=%v)",
timeout, c.Host, paused, lastErr)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}
}
}

// GetWalletPubkey calls the GetWalletPubkey RPC to read the keypair file on the
// remote host and return the base58-encoded public key.
func (c *Client) GetWalletPubkey(ctx context.Context) (solana.PublicKey, error) {
Expand Down
25 changes: 25 additions & 0 deletions e2e/qa_multicast_settlement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ func TestQA_MulticastSettlement(t *testing.T) {
}
})

if !t.Run("ensure_program_unpaused", func(t *testing.T) {
// Migrations pause the program; while paused the oracle cannot ack
// instant seat allocation requests, which would leave the seat
// un-withdrawable and fail the rest of the test with a confusing
// "invalid account data for instruction" rejection.
paused, err := client.IsProgramPaused(ctx)
require.NoError(t, err, "failed to read program-paused flag")
if paused {
t.Skip("Skipping: shred-subscription program is paused (migration in progress)")
}
}) {
return
}

if !t.Run("ensure_multicast_disconnected", func(t *testing.T) {
statuses, err := client.GetUserStatuses(ctx)
if err != nil {
Expand Down Expand Up @@ -182,6 +196,17 @@ func TestQA_MulticastSettlement(t *testing.T) {
return
}

if !t.Run("wait_for_seat_allocation_acked", func(t *testing.T) {
// The tunnel can come up before the oracle has acked the instant
// allocation request. Withdraw rejects while the request is still
// pending, so wait here rather than racing the oracle on the
// withdraw_seat step.
err := client.WaitForSeatAllocationAcked(ctx, device.PubKey, 90*time.Second)
require.NoError(t, err, "oracle did not ack instant seat allocation")
}) {
return
}

if !t.Run("validate_tunnel_up", func(t *testing.T) {
err := client.WaitForMulticastStatusUp(ctx)
require.NoError(t, err, "multicast tunnel did not come up after seat payment")
Expand Down
47 changes: 42 additions & 5 deletions sdk/revdist/go/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package revdist
import (
"context"
"encoding/binary"
"errors"
"os"
"testing"

Expand Down Expand Up @@ -311,16 +312,52 @@ func TestCompatRewardShares(t *testing.T) {
client, _ := compatClient(t)
ctx := context.Background()

// Fetch config and use an older epoch that is more likely to have ledger records.
progConfig, err := client.FetchConfig(ctx)
if err != nil {
t.Fatalf("FetchConfig: %v", err)
}
epoch := progConfig.NextCompletedDZEpoch - 5

shares, err := client.FetchRewardShares(ctx, epoch)
if err != nil {
t.Fatalf("FetchRewardShares(%d): %v", epoch, err)
// Not every recent DZ epoch has a Shapley output record on the ledger
// (records can be absent or get pruned), so scan a small window of older
// epochs for the first one that does. Starting at next-5 follows the
// original heuristic of giving the most recent epochs time to settle.
const (
startOffset = 5
scanDepth = 10
)
if progConfig.NextCompletedDZEpoch <= startOffset {
t.Skipf("NextCompletedDZEpoch=%d is too low to scan reward shares", progConfig.NextCompletedDZEpoch)
}
maxOffset := startOffset + scanDepth - 1
if uint64(maxOffset) >= progConfig.NextCompletedDZEpoch {
maxOffset = int(progConfig.NextCompletedDZEpoch - 1)
}

var (
shares *ShapleyOutputStorage
epoch uint64
missing []uint64
)
for offset := startOffset; offset <= maxOffset; offset++ {
candidate := progConfig.NextCompletedDZEpoch - uint64(offset)
s, err := client.FetchRewardShares(ctx, candidate)
if err == nil {
shares = s
epoch = candidate
break
}
if errors.Is(err, ErrAccountNotFound) {
missing = append(missing, candidate)
continue
}
t.Fatalf("FetchRewardShares(%d): %v", candidate, err)
}
if shares == nil {
t.Fatalf("no reward shares record found in epochs %v (next=%d)",
missing, progConfig.NextCompletedDZEpoch)
}
if len(missing) > 0 {
t.Logf("skipped epochs without records: %v", missing)
}

if shares.Epoch != epoch {
Expand Down
16 changes: 15 additions & 1 deletion sdk/shreds/go/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,23 @@ type ClientSeat struct {
Gap [2][32]byte // StorageGap<2>
}

// Flag bits in ClientSeat.Flags. Mirrors the onchain bit indices from the
// shred-subscription program.
const (
clientSeatFlagHasPriceOverrideBit = 0
clientSeatFlagHasPendingInstantRequestBit = 1
)

// HasPriceOverride returns true if a flat price override is active.
func (s *ClientSeat) HasPriceOverride() bool {
return s.Flags&1 != 0
return s.Flags&(1<<clientSeatFlagHasPriceOverrideBit) != 0
}

// HasPendingInstantRequest returns true if the seat has a pending instant
// allocation request that the oracle has not yet acked or rejected. Withdrawal
// and escrow-close are blocked onchain while this is set.
func (s *ClientSeat) HasPendingInstantRequest() bool {
return s.Flags&(1<<clientSeatFlagHasPendingInstantRequestBit) != 0
}

// PaymentEscrow holds USDC balance funding a client seat.
Expand Down
Loading