Skip to content
Merged
8 changes: 8 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (

MainnetRevenueDistributionProgramID = "dzrevZC94tBLwuHw1dyynZxaXTWyp7yocsinyEVPtt4"
MainnetGeolocationProgramID = "8H7nS6eZiuf7rGQtz3PPz2q9m4eJRL37PPM678KHnspG"
MainnetShredSubscriptionProgramID = "dzshrr3yL57SB13sJPYHYo3TV8Bo1i1FxkyrZr3bKNE"
MainnetUSDCMint = "" // CLI defaults to real USDC on mainnet

// Testnet constants.
TestnetLedgerPublicRPCURL = "https://doublezerolocalnet.rpcpool.com/8a4fd3f4-0977-449f-88c7-63d4b0f10f16"
Expand All @@ -27,6 +29,8 @@ const (
TestnetTelemetryFlowIngestURL = "http://telemetry-flow-in.testnet.doublezero.xyz"
TestnetTelemetryStateIngestURL = "http://telemetry-state-in.testnet.doublezero.xyz"
TestnetGeolocationProgramID = "3AG2BCA7gAm47Q6xZzPQcUUYvnBjxAvPKnPz919cxHF4"
TestnetShredSubscriptionProgramID = "dzshrr3yL57SB13sJPYHYo3TV8Bo1i1FxkyrZr3bKNE"
TestnetUSDCMint = "uSDZq2RMuxrEf7gqgDjR8wJCtCyaDAQk2e5jLAaoeeM"
TestnetTelemetryGNMITunnelServerAddr = "gnmic-testnet.doublezero.xyz:443"

// Devnet constants.
Expand All @@ -39,6 +43,8 @@ const (
DevnetTelemetryFlowIngestURL = "http://telemetry-flow-in.devnet.doublezero.xyz"
DevnetTelemetryStateIngestURL = "http://telemetry-state-in.devnet.doublezero.xyz"
DevnetGeolocationProgramID = "EXUUFfAjjuXnaBtsAMLsJX18ynnNHPwtkmk33bLVVoCm"
DevnetShredSubscriptionProgramID = ""
DevnetUSDCMint = ""
DevnetTelemetryGNMITunnelServerAddr = "gnmic-devnet.doublezero.xyz:443"

// Localnet constants.
Expand All @@ -52,5 +58,7 @@ const (
LocalnetTelemetryFlowIngestURL = "http://localhost:8911"
LocalnetTelemetryStateIngestURL = "http://localhost:8911"
LocalnetGeolocationProgramID = "36WA9nUCsJaAQL5h44WYoLezDpocy8Q71NZbtrUN8DyC"
LocalnetShredSubscriptionProgramID = ""
LocalnetUSDCMint = ""
LocalnetTelemetryGNMITunnelServerAddr = "localhost:50051"
)
17 changes: 17 additions & 0 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type NetworkConfig struct {
TelemetryStateIngestURL string
TelemetryGNMITunnelServerAddr string
GeolocationProgramID solana.PublicKey
ShredSubscriptionProgramID string
USDCMint string
}

func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
Expand Down Expand Up @@ -63,6 +65,8 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
RevenueDistributionProgramID: revenueDistributionProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ShredSubscriptionProgramID: MainnetShredSubscriptionProgramID,
USDCMint: MainnetUSDCMint,
DeviceLocalASN: MainnetDeviceLocalASN,
TwoZOracleURL: MainnetTwoZOracleURL,
SolanaRPCURL: MainnetSolanaRPC,
Expand Down Expand Up @@ -94,6 +98,8 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
TelemetryProgramID: telemetryProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ShredSubscriptionProgramID: TestnetShredSubscriptionProgramID,
USDCMint: TestnetUSDCMint,
DeviceLocalASN: TestnetDeviceLocalASN,
TwoZOracleURL: TestnetTwoZOracleURL,
SolanaRPCURL: TestnetSolanaRPC,
Expand Down Expand Up @@ -125,6 +131,8 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
TelemetryProgramID: telemetryProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ShredSubscriptionProgramID: DevnetShredSubscriptionProgramID,
USDCMint: DevnetUSDCMint,
DeviceLocalASN: DevnetDeviceLocalASN,
TwoZOracleURL: DevnetTwoZOracleURL,
SolanaRPCURL: TestnetSolanaRPC,
Expand Down Expand Up @@ -156,6 +164,8 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
TelemetryProgramID: telemetryProgramID,
InternetLatencyCollectorPK: internetLatencyCollectorPK,
GeolocationProgramID: geolocationProgramID,
ShredSubscriptionProgramID: LocalnetShredSubscriptionProgramID,
USDCMint: LocalnetUSDCMint,
DeviceLocalASN: LocalnetDeviceLocalASN,
TwoZOracleURL: LocalnetTwoZOracleURL,
SolanaRPCURL: LocalnetSolanaRPC,
Expand All @@ -168,6 +178,13 @@ func NetworkConfigForEnv(env string) (*NetworkConfig, error) {
return nil, fmt.Errorf("invalid environment %q, must be one of: %s, %s, %s", env, EnvMainnetBeta, EnvTestnet, EnvDevnet)
}

// Validate shred subscription program ID if set (empty means not yet deployed to this env).
if config.ShredSubscriptionProgramID != "" {
if _, err := solana.PublicKeyFromBase58(config.ShredSubscriptionProgramID); err != nil {
return nil, fmt.Errorf("failed to parse shred subscription program ID: %w", err)
}
}

ledgerRPCURL := os.Getenv("DZ_LEDGER_RPC_URL")
if ledgerRPCURL != "" {
config.LedgerPublicRPCURL = ledgerRPCURL
Expand Down
6 changes: 6 additions & 0 deletions config/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func TestConfig_NetworkConfigForEnv(t *testing.T) {
TelemetryStateIngestURL: config.MainnetTelemetryStateIngestURL,
TelemetryGNMITunnelServerAddr: config.MainnetTelemetryGNMITunnelServerAddr,
GeolocationProgramID: solana.MustPublicKeyFromBase58(config.MainnetGeolocationProgramID),
ShredSubscriptionProgramID: config.MainnetShredSubscriptionProgramID,
USDCMint: config.MainnetUSDCMint,
},
},
{
Expand All @@ -50,6 +52,8 @@ func TestConfig_NetworkConfigForEnv(t *testing.T) {
TelemetryStateIngestURL: config.MainnetTelemetryStateIngestURL,
TelemetryGNMITunnelServerAddr: config.MainnetTelemetryGNMITunnelServerAddr,
GeolocationProgramID: solana.MustPublicKeyFromBase58(config.MainnetGeolocationProgramID),
ShredSubscriptionProgramID: config.MainnetShredSubscriptionProgramID,
USDCMint: config.MainnetUSDCMint,
},
},
{
Expand All @@ -67,6 +71,8 @@ func TestConfig_NetworkConfigForEnv(t *testing.T) {
TelemetryStateIngestURL: config.TestnetTelemetryStateIngestURL,
TelemetryGNMITunnelServerAddr: config.TestnetTelemetryGNMITunnelServerAddr,
GeolocationProgramID: solana.MustPublicKeyFromBase58(config.TestnetGeolocationProgramID),
ShredSubscriptionProgramID: config.TestnetShredSubscriptionProgramID,
USDCMint: config.TestnetUSDCMint,
},
},
{
Expand Down
25 changes: 23 additions & 2 deletions e2e/internal/qa/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ type Client struct {
// Exported as a simple configuration field (unlike publicIP which uses a setter
// because it has a non-nil invariant enforced by SetPublicIP).
ClientIP string

// Settlement config passed to doublezero-solana shreds commands.
// SolanaRPCURL is the Solana RPC endpoint for settlement transactions (--url).
// On testnet this is the DZ ledger URL; on mainnet it's the public Solana RPC.
SolanaRPCURL string
ShredSubscriptionProgramID string
DZLedgerURL string
USDCMint string
Keypair string
}

func NewClient(ctx context.Context, log *slog.Logger, hostname string, port int, networkConfig *config.NetworkConfig, devices map[string]*Device, allocateAddr bool) (*Client, error) {
Expand All @@ -125,6 +134,14 @@ func NewClient(ctx context.Context, log *slog.Logger, hostname string, port int,

serviceabilityClient := serviceability.New(rpc.New(networkConfig.LedgerPublicRPCURL), networkConfig.ServiceabilityProgramID)

// Settlement transactions on testnet/devnet use the DZ ledger RPC endpoint
// (which hosts the settlement programs). Mainnet and localnet use the
// standard Solana RPC.
solanaRPCURL := networkConfig.SolanaRPCURL
if networkConfig.Moniker == config.EnvTestnet || networkConfig.Moniker == config.EnvDevnet {
solanaRPCURL = networkConfig.LedgerPublicRPCURL
}

return &Client{
log: log,
grpcClient: grpcClient,
Expand All @@ -133,8 +150,12 @@ func NewClient(ctx context.Context, log *slog.Logger, hostname string, port int,
serviceability: serviceabilityClient,
devices: devices,

Host: hostname,
AllocateAddr: allocateAddr,
Host: hostname,
AllocateAddr: allocateAddr,
SolanaRPCURL: solanaRPCURL,
ShredSubscriptionProgramID: networkConfig.ShredSubscriptionProgramID,
DZLedgerURL: networkConfig.LedgerPublicRPCURL,
USDCMint: networkConfig.USDCMint,
}, nil
}

Expand Down
122 changes: 122 additions & 0 deletions e2e/internal/qa/client_settlement.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package qa

import (
"context"
"fmt"
"math"

pb "github.com/malbeclabs/doublezero/e2e/proto/qa/gen/pb-go"
"google.golang.org/protobuf/types/known/emptypb"
)

// FeedEnable calls the FeedEnable RPC to start the doublezerod reconciler.
func (c *Client) FeedEnable(ctx context.Context) error {
c.log.Debug("Enabling reconciler", "host", c.Host)
resp, err := c.grpcClient.FeedEnable(ctx, &emptypb.Empty{})
if err != nil {
return fmt.Errorf("failed to enable reconciler on host %s: %w", c.Host, err)
}
if !resp.GetSuccess() {
return fmt.Errorf("enable failed on host %s: %s", c.Host, resp.GetOutput())
}
c.log.Debug("Reconciler enabled", "host", c.Host)
return nil
}

// ClosestDevice returns the reachable device with the lowest average latency.
// It calls GetLatency and looks up the result in the client's devices map.
func (c *Client) ClosestDevice(ctx context.Context) (*Device, error) {
latencies, err := c.GetLatency(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get latency on host %s: %w", c.Host, err)
}

var bestLatency *pb.Latency
var bestAvg uint64 = math.MaxUint64
for _, l := range latencies {
if !l.Reachable {
continue
}
if l.AvgLatencyNs < bestAvg {
bestAvg = l.AvgLatencyNs
bestLatency = l
}
}
if bestLatency == nil {
return nil, fmt.Errorf("no reachable devices found on host %s", c.Host)
}

// Look up device by code in the devices map.
device, ok := c.devices[bestLatency.DeviceCode]
if !ok {
return nil, fmt.Errorf("closest device %q (pk=%s) not found in devices map on host %s", bestLatency.DeviceCode, bestLatency.DevicePk, c.Host)
}

c.log.Debug("Determined closest device", "host", c.Host, "deviceCode", device.Code, "avgLatencyNs", bestAvg)
return device, nil
}

// FeedSeatPrice calls the FeedSeatPrice RPC to query device seat prices.
func (c *Client) FeedSeatPrice(ctx context.Context) ([]*pb.DevicePrice, error) {
c.log.Debug("Querying seat prices", "host", c.Host)
resp, err := c.grpcClient.FeedSeatPrice(ctx, &pb.FeedSeatPriceRequest{
SolanaRpcUrl: c.SolanaRPCURL,
DzLedgerUrl: c.DZLedgerURL,
UsdcMint: c.USDCMint,
Keypair: c.Keypair,
ShredSubscriptionProgramId: c.ShredSubscriptionProgramID,
})
if err != nil {
return nil, fmt.Errorf("failed to get seat prices on host %s: %w", c.Host, err)
}
c.log.Debug("Seat prices retrieved", "host", c.Host, "count", len(resp.GetPrices()))
return resp.GetPrices(), nil
}

// FeedSeatPay calls the FeedSeatPay RPC to pay for a seat on a device.
// The client's public IP is auto-filled. Instant allocation is the default.
func (c *Client) FeedSeatPay(ctx context.Context, devicePubkey string, amount string) error {
c.log.Debug("Paying for seat", "host", c.Host, "device", devicePubkey, "amount", amount)
resp, err := c.grpcClient.FeedSeatPay(ctx, &pb.FeedSeatPayRequest{
DevicePubkey: devicePubkey,
ClientIp: c.publicIP.To4().String(),
Amount: amount,
SolanaRpcUrl: c.SolanaRPCURL,
ShredSubscriptionProgramId: c.ShredSubscriptionProgramID,
DzLedgerUrl: c.DZLedgerURL,
UsdcMint: c.USDCMint,
Keypair: c.Keypair,
})
if err != nil {
return fmt.Errorf("failed to pay for seat on host %s: %w", c.Host, err)
}
if !resp.GetSuccess() {
c.log.Error("Seat payment failed", "host", c.Host, "device", devicePubkey, "output", resp.GetOutput())
return fmt.Errorf("seat payment failed on host %s: %s", c.Host, resp.GetOutput())
}
c.log.Debug("Seat payment successful", "host", c.Host, "device", devicePubkey)
return nil
}

// FeedSeatWithdraw calls the FeedSeatWithdraw RPC to withdraw a seat from a device.
// Instant withdrawal is the default.
func (c *Client) FeedSeatWithdraw(ctx context.Context, devicePubkey string) error {
c.log.Debug("Withdrawing seat", "host", c.Host, "device", devicePubkey)
resp, err := c.grpcClient.FeedSeatWithdraw(ctx, &pb.FeedSeatWithdrawRequest{
DevicePubkey: devicePubkey,
ClientIp: c.publicIP.To4().String(),
SolanaRpcUrl: c.SolanaRPCURL,
ShredSubscriptionProgramId: c.ShredSubscriptionProgramID,
DzLedgerUrl: c.DZLedgerURL,
UsdcMint: c.USDCMint,
Keypair: c.Keypair,
})
if err != nil {
return fmt.Errorf("failed to withdraw seat on host %s: %w", c.Host, err)
}
if !resp.GetSuccess() {
return fmt.Errorf("seat withdrawal failed on host %s: %s", c.Host, resp.GetOutput())
}
c.log.Debug("Seat withdrawal successful", "host", c.Host, "device", devicePubkey)
return nil
}
24 changes: 0 additions & 24 deletions e2e/internal/qa/provisioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,27 +601,3 @@ func formatBandwidth(bps uint64) string {
return fmt.Sprintf("%d bps", bps)
}

type CLIDeviceOutput struct {
Account string `json:"account"`
Code string `json:"code"`
ContributorCode string `json:"contributor_code"`
LocationCode string `json:"location_code"`
ExchangeCode string `json:"exchange_code"`
DeviceType string `json:"device_type"`
PublicIP string `json:"public_ip"`
DzPrefixes []string `json:"dz_prefixes"`
Users int `json:"users"`
MaxUsers int `json:"max_users"`
Status string `json:"status"`
Health string `json:"health"`
MgmtVrf string `json:"mgmt_vrf"`
Owner string `json:"owner"`
}

func parseDeviceListJSON(output []byte) ([]CLIDeviceOutput, error) {
var devices []CLIDeviceOutput
if err := json.Unmarshal(output, &devices); err != nil {
return nil, fmt.Errorf("failed to parse device list JSON: %w", err)
}
return devices, nil
}
Loading
Loading