From cf764f55eee40da07f8770baf6dddfb56317b9ee Mon Sep 17 00:00:00 2001 From: Greg Mitchell Date: Wed, 27 May 2026 15:38:00 +0000 Subject: [PATCH 1/2] tools/stress: orchestrator skeleton MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds tools/stress/device-orchestrator/, the device-stress orchestrator binary for the GRE Tunnel Capacity Study. The binary parses every flag from #3746's CLI list, dumps orchestrator-config.json on start, runs a provision-then- reverse-deprovision sweep against a live serviceability program, and emits the runlog row schema {run_id, user_index, user_pubkey, tunnel_id, event, t_ns, n_after_event} for each submit | confirm | activate | deprovision_* event. Packages: - pkg/reconcile — PlanFor() pure function (lifted from the part-1 SDK PR; now lives with the orchestrator as policy, not as an SDK primitive) - pkg/runlog — append-only JSONL writer for orchestrator-runlog.json - pkg/sweep — provision-then-deprovision loop driven by PlanFor; uses a Clock + Executor interface for testability; reverse-creation-order delete - pkg/abort — sentinel-file poller that cancels a derived ctx between user iterations so an in-flight Create/Delete completes before exit - pkg/agent — AgentRunner interface + noop impl; SSH runner lands in part 3 along with pre_commit_log / applied event emission - pkg/exec — Live impl of sweep.Executor over serviceability.{Client, Executor}; picks deterministic per-user IPs from --client-ip-base - cmd/device-orchestrator — flag parsing, config dump, signal + abort handling, sweep wiring The agent runner is stubbed behind an interface so this PR can land end-to-end functionality (provision/deprovision + runlog + abort) without the SSH plumbing. The SSH runner and the corresponding pre_commit_log / applied row generation land in part 3 of #3746. Part 2 of #3746. Closes #3771. --- CHANGELOG.md | 2 + tools/stress/device-orchestrator/Makefile | 15 + .../cmd/device-orchestrator/main.go | 277 +++++++++++++++ .../device-orchestrator/pkg/abort/abort.go | 64 ++++ .../pkg/abort/abort_test.go | 80 +++++ .../device-orchestrator/pkg/agent/agent.go | 73 ++++ .../pkg/agent/agent_test.go | 32 ++ .../device-orchestrator/pkg/exec/exec.go | 139 ++++++++ .../device-orchestrator/pkg/exec/exec_test.go | 27 ++ .../pkg/reconcile/reconcile.go | 63 ++++ .../pkg/reconcile/reconcile_test.go | 166 +++++++++ .../device-orchestrator/pkg/runlog/runlog.go | 101 ++++++ .../pkg/runlog/runlog_test.go | 93 +++++ .../device-orchestrator/pkg/sweep/sweep.go | 262 ++++++++++++++ .../pkg/sweep/sweep_test.go | 321 ++++++++++++++++++ 15 files changed, 1715 insertions(+) create mode 100644 tools/stress/device-orchestrator/Makefile create mode 100644 tools/stress/device-orchestrator/cmd/device-orchestrator/main.go create mode 100644 tools/stress/device-orchestrator/pkg/abort/abort.go create mode 100644 tools/stress/device-orchestrator/pkg/abort/abort_test.go create mode 100644 tools/stress/device-orchestrator/pkg/agent/agent.go create mode 100644 tools/stress/device-orchestrator/pkg/agent/agent_test.go create mode 100644 tools/stress/device-orchestrator/pkg/exec/exec.go create mode 100644 tools/stress/device-orchestrator/pkg/exec/exec_test.go create mode 100644 tools/stress/device-orchestrator/pkg/reconcile/reconcile.go create mode 100644 tools/stress/device-orchestrator/pkg/reconcile/reconcile_test.go create mode 100644 tools/stress/device-orchestrator/pkg/runlog/runlog.go create mode 100644 tools/stress/device-orchestrator/pkg/runlog/runlog_test.go create mode 100644 tools/stress/device-orchestrator/pkg/sweep/sweep.go create mode 100644 tools/stress/device-orchestrator/pkg/sweep/sweep_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index b05af54195..30d24ca9f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ All notable changes to this project will be documented in this file. - Add CreateUser / DeleteUser to the serviceability executor with cross-language wire-format fixtures and four new PDA helpers (GetUserPDA, GetAccessPassPDA, GetTunnelIdsPDA, GetDzPrefixBlockPDA) - CLI - Honor the build-configured default environment (`Testnet` by default, `MainnetBeta` under the `default-mainnet-beta` feature) when neither `--env` nor a persisted `config.yml` selects one. The RFC-20 context-build previously fell back to `Environment::default()`, which is always `Devnet` regardless of the build, so a testnet build with no config silently targeted Devnet's ledger URLs and program IDs. The binary now resolves the fallback through the new `doublezero_sdk::default_environment()`, matching the legacy `DZClient::new` defaults (`default_program_id`, `ClientConfig::default`) which already key off the compiled-in environment ([#3810](https://github.com/malbeclabs/doublezero/pull/3810)) +- Tools + - Add `tools/stress/device-orchestrator/` — the device-stress orchestrator skeleton for the GRE Tunnel Capacity Study. The binary parses every flag from #3746's CLI list, dumps `orchestrator-config.json` on start, runs a provision-then-reverse-deprovision sweep against a live serviceability program, and emits the runlog row schema `{run_id, user_index, user_pubkey, tunnel_id, event, t_ns, n_after_event}` to `orchestrator-runlog.json` for each `submit | confirm | activate | deprovision_*` event. The agent runner is stubbed behind a `pkg/agent.Runner` interface (no-op impl ships now; the SSH-backed runner that emits `pre_commit_log` / `applied` lands in part 3). The sweep cooperates with an abort sentinel file: when the file appears the in-flight user completes and the orchestrator deprovisions everything it created before exiting non-zero. `PlanReconcile` / `Plan` (lifted from the part-1 SDK PR) now lives at `tools/stress/device-orchestrator/pkg/reconcile/` as orchestrator policy rather than SDK primitive. Part 2 of #3746 ([#3771](https://github.com/malbeclabs/doublezero/issues/3771)). ## [v0.25.0](https://github.com/malbeclabs/doublezero/compare/client/v0.24.0...client/v0.25.0) - 2026-05-29 diff --git a/tools/stress/device-orchestrator/Makefile b/tools/stress/device-orchestrator/Makefile new file mode 100644 index 0000000000..6ed19c04fe --- /dev/null +++ b/tools/stress/device-orchestrator/Makefile @@ -0,0 +1,15 @@ +PREFIX:=github.com/malbeclabs/doublezero/tools/stress/device-orchestrator +BUILD:=`git rev-parse --short HEAD` +LDFLAGS=-ldflags "-X=$(PREFIX)/build.Build=$(BUILD)" + +.PHONY: test +test: + go test -race -v -coverprofile coverage.out ./... + +.PHONY: lint +lint: + golangci-lint run -c ../../../.golangci.yaml + +.PHONY: build +build: + CGO_ENABLED=0 go build -v $(LDFLAGS) -o bin/device-orchestrator cmd/device-orchestrator/main.go diff --git a/tools/stress/device-orchestrator/cmd/device-orchestrator/main.go b/tools/stress/device-orchestrator/cmd/device-orchestrator/main.go new file mode 100644 index 0000000000..ab01975d30 --- /dev/null +++ b/tools/stress/device-orchestrator/cmd/device-orchestrator/main.go @@ -0,0 +1,277 @@ +// device-orchestrator runs the GRE Tunnel Capacity Study sweep against a +// live serviceability program: provisions N users on a target device in +// batches with a hold between each, then deprovisions in reverse-creation +// order. Per #3771 (part 2 of #3746) the SSH-driven agent runner is stubbed +// behind the agent.Runner interface; the no-op implementation is used here +// and the SSH implementation lands in part 3. +package main + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "flag" + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/gagliardetto/solana-go" + solanarpc "github.com/gagliardetto/solana-go/rpc" + + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/abort" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/agent" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/exec" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/runlog" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/sweep" +) + +// orchestratorConfig captures the resolved CLI inputs in the shape that gets +// dumped to orchestrator-config.json on start. +type orchestratorConfig struct { + RunID string `json:"run_id"` + TargetUserCount int `json:"target_user_count"` + UsersPerBatch int `json:"users_per_batch"` + HoldSeconds int `json:"hold_seconds"` + DUTPubkey string `json:"dut_pubkey"` + DUTSSHHost string `json:"dut_ssh_host"` + DUTSSHKey string `json:"dut_ssh_key"` + RPCURL string `json:"rpc_url"` + ProgramID string `json:"program_id"` + KeypairPath string `json:"keypair"` + ControllerAddr string `json:"controller"` + AbortFile string `json:"abort_file"` + WorkingDir string `json:"working_dir"` + ClientIPBase string `json:"client_ip_base"` + TunnelEndpoint string `json:"tunnel_endpoint"` + TenantPubkey string `json:"tenant_pubkey,omitempty"` +} + +func main() { + if err := run(); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func run() error { + var ( + targetUserCount = flag.Int("target-user-count", 8, "Final user count to sweep up to.") + usersPerBatch = flag.Int("users-per-batch", 2, "Users provisioned per batch before the hold.") + holdSeconds = flag.Int("hold-seconds", 180, "Seconds to hold between batches.") + dutPubkey = flag.String("dut-pubkey", "", "Device-under-test pubkey (base58).") + dutSSHHost = flag.String("dut-ssh-host", "", "SSH host:port for the DUT (used by the part-3 agent runner).") + dutSSHKey = flag.String("dut-ssh-key", "", "SSH private-key path for the DUT.") + rpcURL = flag.String("rpc-url", "", "Serviceability RPC URL.") + programID = flag.String("program-id", "", "Serviceability program ID (base58).") + keypairPath = flag.String("keypair", "", "Path to the orchestrator's solana keypair JSON.") + controllerAddr = flag.String("controller", "", "Controller IP:PORT, forwarded to the DUT agent in part 3.") + abortFile = flag.String("abort-file", "", "Path to a sentinel file; when it appears the sweep finishes the current user and exits.") + workingDir = flag.String("working-dir", ".", "Output directory for orchestrator-config.json / orchestrator-runlog.json.") + clientIPBase = flag.String("client-ip-base", "100.64.0.0", "Starting IPv4 address; per-user IP is base + idx.") + tunnelEndpoint = flag.String("tunnel-endpoint", "0.0.0.0", "Tunnel endpoint IP passed to UserCreateArgs; 0.0.0.0 lets the program fall back to the device's public IP.") + tenantPubkey = flag.String("tenant-pubkey", "", "Optional tenant pubkey for UserCreateArgs.") + runID = flag.String("run-id", "", "Run identifier written into every runlog row; auto-generated if empty.") + logLevel = flag.String("log-level", "info", "slog level: debug|info|warn|error.") + dryRun = flag.Bool("dry-run", false, "Validate flags and dump orchestrator-config.json without contacting the RPC.") + ) + flag.Parse() + + logger := newLogger(*logLevel) + slog.SetDefault(logger) + + if *runID == "" { + var buf [8]byte + if _, err := rand.Read(buf[:]); err != nil { + return fmt.Errorf("generate run id: %w", err) + } + *runID = "run-" + hex.EncodeToString(buf[:]) + } + + if err := os.MkdirAll(*workingDir, 0o755); err != nil { + return fmt.Errorf("create working dir: %w", err) + } + + baseIP, err := parseIPv4(*clientIPBase) + if err != nil { + return fmt.Errorf("parse --client-ip-base: %w", err) + } + tunnelIP, err := parseIPv4(*tunnelEndpoint) + if err != nil { + return fmt.Errorf("parse --tunnel-endpoint: %w", err) + } + + resolved := orchestratorConfig{ + RunID: *runID, + TargetUserCount: *targetUserCount, + UsersPerBatch: *usersPerBatch, + HoldSeconds: *holdSeconds, + DUTPubkey: *dutPubkey, + DUTSSHHost: *dutSSHHost, + DUTSSHKey: *dutSSHKey, + RPCURL: *rpcURL, + ProgramID: *programID, + KeypairPath: *keypairPath, + ControllerAddr: *controllerAddr, + AbortFile: *abortFile, + WorkingDir: *workingDir, + ClientIPBase: *clientIPBase, + TunnelEndpoint: *tunnelEndpoint, + TenantPubkey: *tenantPubkey, + } + configPath := filepath.Join(*workingDir, "orchestrator-config.json") + if err := dumpJSON(configPath, resolved); err != nil { + return fmt.Errorf("write orchestrator-config.json: %w", err) + } + logger.Info("orchestrator-config.json written", "path", configPath) + + if *dryRun { + logger.Info("dry-run: skipping sweep") + return nil + } + + if err := requireFlags(map[string]string{ + "--dut-pubkey": *dutPubkey, + "--rpc-url": *rpcURL, + "--program-id": *programID, + "--keypair": *keypairPath, + }); err != nil { + return err + } + + dutPK, err := solana.PublicKeyFromBase58(*dutPubkey) + if err != nil { + return fmt.Errorf("--dut-pubkey: %w", err) + } + programPK, err := solana.PublicKeyFromBase58(*programID) + if err != nil { + return fmt.Errorf("--program-id: %w", err) + } + signer, err := solana.PrivateKeyFromSolanaKeygenFile(*keypairPath) + if err != nil { + return fmt.Errorf("load --keypair: %w", err) + } + + var tenantPK solana.PublicKey + if *tenantPubkey != "" { + tenantPK, err = solana.PublicKeyFromBase58(*tenantPubkey) + if err != nil { + return fmt.Errorf("--tenant-pubkey: %w", err) + } + } + + rpc := solanarpc.New(*rpcURL) + client := serviceability.New(rpc, programPK) + executor := serviceability.NewExecutor(logger, rpc, &signer, programPK) + + liveExec, err := exec.New(exec.Config{ + Client: client, + Executor: executor, + DevicePubkey: dutPK, + TenantPubkey: tenantPK, + ClientIPBase: baseIP, + TunnelEndpoint: tunnelIP, + UserType: serviceability.UserTypeIBRL, + CyoaType: serviceability.CyoaTypeGREOverDIA, + DzPrefixCount: 1, + }) + if err != nil { + return err + } + + runlogPath := filepath.Join(*workingDir, "orchestrator-runlog.json") + rlw, err := runlog.Open(runlogPath) + if err != nil { + return err + } + defer rlw.Close() + logger.Info("orchestrator-runlog.json open", "path", runlogPath) + + // Compose ctx: signal cancellation + abort-file cancellation. + rootCtx, rootCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer rootCancel() + ctx, abortCancel := abort.Watch(rootCtx, *abortFile, abort.DefaultPollInterval, logger) + defer abortCancel() + + cfg := sweep.Config{ + RunID: *runID, + Target: *targetUserCount, + UsersPerBatch: *usersPerBatch, + Hold: time.Duration(*holdSeconds) * time.Second, + OwnerFilter: signer.PublicKey(), + Executor: liveExec, + Agent: agent.NewNoop(logger), + Runlog: rlw, + Clock: sweep.RealClock{}, + Logger: logger, + } + + logger.Info("sweep starting", "target", cfg.Target, "batch", cfg.UsersPerBatch, "hold", cfg.Hold) + if err := sweep.Run(ctx, cfg); err != nil { + if errors.Is(err, context.Canceled) { + logger.Warn("sweep cancelled", "err", err) + return err + } + return fmt.Errorf("sweep: %w", err) + } + logger.Info("sweep finished") + return nil +} + +func newLogger(level string) *slog.Logger { + lvl := slog.LevelInfo + switch level { + case "debug": + lvl = slog.LevelDebug + case "warn": + lvl = slog.LevelWarn + case "error": + lvl = slog.LevelError + } + return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: lvl})) +} + +func dumpJSON(path string, v any) error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + return enc.Encode(v) +} + +func requireFlags(required map[string]string) error { + var missing []string + for name, val := range required { + if val == "" { + missing = append(missing, name) + } + } + if len(missing) > 0 { + return fmt.Errorf("missing required flag(s): %v", missing) + } + return nil +} + +func parseIPv4(s string) ([4]byte, error) { + ip := net.ParseIP(s) + if ip == nil { + return [4]byte{}, fmt.Errorf("invalid IPv4 %q", s) + } + v4 := ip.To4() + if v4 == nil { + return [4]byte{}, fmt.Errorf("not IPv4: %q", s) + } + var out [4]byte + copy(out[:], v4) + return out, nil +} diff --git a/tools/stress/device-orchestrator/pkg/abort/abort.go b/tools/stress/device-orchestrator/pkg/abort/abort.go new file mode 100644 index 0000000000..8f191f5499 --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/abort/abort.go @@ -0,0 +1,64 @@ +// Package abort polls a sentinel file on disk and cancels a context when the +// file appears. The orchestrator uses this for cooperative shutdown: an +// operator drops a file at the path passed via --abort-file and the running +// sweep finishes the current user iteration before exiting. +package abort + +import ( + "context" + "errors" + "log/slog" + "os" + "time" +) + +// Default polling cadence. The sweep loop only checks the cancellation between +// user iterations, so the abort signal latency is bounded by min(this, one +// user iteration). +const DefaultPollInterval = 250 * time.Millisecond + +// Watch returns a derived context that cancels as soon as `path` exists on +// disk. If path is empty the returned context is the parent verbatim and the +// returned stop is a no-op. The watcher goroutine exits when parent or the +// returned context is cancelled. +// +// Pass log=nil for silent operation. +func Watch(parent context.Context, path string, interval time.Duration, log *slog.Logger) (context.Context, context.CancelFunc) { + if path == "" { + return parent, func() {} + } + if interval <= 0 { + interval = DefaultPollInterval + } + ctx, cancel := context.WithCancel(parent) + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if exists(path) { + if log != nil { + log.Warn("abort file detected; cancelling sweep", "path", path) + } + cancel() + return + } + } + } + }() + return ctx, cancel +} + +// exists reports whether path refers to an existing filesystem entry. Any +// stat error other than ENOENT is treated as "exists" so a permission error +// doesn't silently leave the orchestrator running past an operator abort. +func exists(path string) bool { + _, err := os.Stat(path) + if err == nil { + return true + } + return !errors.Is(err, os.ErrNotExist) +} diff --git a/tools/stress/device-orchestrator/pkg/abort/abort_test.go b/tools/stress/device-orchestrator/pkg/abort/abort_test.go new file mode 100644 index 0000000000..13fdfba47c --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/abort/abort_test.go @@ -0,0 +1,80 @@ +package abort_test + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + "time" + + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/abort" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWatch_CancelsWhenAbortFileAppears(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "abort") + ctx, cancel := abort.Watch(context.Background(), path, 25*time.Millisecond, nil) + t.Cleanup(cancel) + + // File doesn't exist yet — ctx is alive. + select { + case <-ctx.Done(): + t.Fatal("ctx cancelled before abort file existed") + case <-time.After(50 * time.Millisecond): + } + + // Touch the abort file. + require.NoError(t, os.WriteFile(path, nil, 0o644)) + + select { + case <-ctx.Done(): + assert.True(t, errors.Is(ctx.Err(), context.Canceled)) + case <-time.After(time.Second): + t.Fatal("ctx did not cancel within 1s after abort file touched") + } +} + +func TestWatch_EmptyPathIsNoOp(t *testing.T) { + t.Parallel() + + parent, parentCancel := context.WithCancel(context.Background()) + t.Cleanup(parentCancel) + + ctx, cancel := abort.Watch(parent, "", 0, nil) + t.Cleanup(cancel) + + select { + case <-ctx.Done(): + t.Fatal("empty-path watch should not cancel on its own") + case <-time.After(50 * time.Millisecond): + } + + // Parent cancellation still propagates through (we return parent verbatim). + parentCancel() + select { + case <-ctx.Done(): + case <-time.After(time.Second): + t.Fatal("derived ctx did not pick up parent cancellation") + } +} + +func TestWatch_StopsWhenParentCancelled(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "abort") + parent, parentCancel := context.WithCancel(context.Background()) + + ctx, cancel := abort.Watch(parent, path, 25*time.Millisecond, nil) + t.Cleanup(cancel) + + parentCancel() + select { + case <-ctx.Done(): + case <-time.After(time.Second): + t.Fatal("parent cancel did not propagate") + } +} diff --git a/tools/stress/device-orchestrator/pkg/agent/agent.go b/tools/stress/device-orchestrator/pkg/agent/agent.go new file mode 100644 index 0000000000..24c1b4dbce --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/agent/agent.go @@ -0,0 +1,73 @@ +// Package agent exposes the AgentRunner interface the orchestrator uses to +// drive doublezero-agent on a device under test (DUT). The skeleton ships a +// no-op implementation; the SSH-backed runner lands in part 3 of #3746. +package agent + +import ( + "context" + "log/slog" + "time" +) + +// EventKind tags an AgentEvent so runlog row generation can map it onto the +// runlog Event vocabulary (`pre_commit_log`, `applied`). +type EventKind int + +const ( + // EventPreCommitLog marks the moment the agent log shows + // `Committing config session due to diffs detected: ` for a new + // tunnel interface; carries the parsed tunnel ID. + EventPreCommitLog EventKind = iota + 1 + // EventApplied marks the moment the agent log shows a commit-success line + // for a previously-pending tunnel interface. + EventApplied +) + +// Event is one observation emitted by the agent runner: a timestamped tunnel +// state transition derived from agent log lines. +type Event struct { + Kind EventKind + TunnelID uint16 + At time.Time +} + +// Runner drives doublezero-agent on the DUT and surfaces tunnel-related events +// extracted from its log stream. +// +// Lifecycle: +// +// - Start(ctx) blocks until the agent stream is healthy enough to emit +// events (or returns an error). It returns immediately for the no-op impl. +// - Events() returns a channel that closes when the runner exits. +// +// The SSH-backed implementation will manage an ssh.Session and parse stdout +// for the two log lines listed under EventKind. +type Runner interface { + Start(ctx context.Context) error + Events() <-chan Event +} + +// NewNoop returns a Runner that never starts a process and never emits events. +// Used by the skeleton sweep loop and by tests where the agent isn't under test. +func NewNoop(log *slog.Logger) Runner { + ch := make(chan Event) + return &noop{log: log, events: ch} +} + +type noop struct { + log *slog.Logger + events chan Event +} + +func (n *noop) Start(ctx context.Context) error { + if n.log != nil { + n.log.Debug("agent: noop runner started (no events will be emitted)") + } + go func() { + <-ctx.Done() + close(n.events) + }() + return nil +} + +func (n *noop) Events() <-chan Event { return n.events } diff --git a/tools/stress/device-orchestrator/pkg/agent/agent_test.go b/tools/stress/device-orchestrator/pkg/agent/agent_test.go new file mode 100644 index 0000000000..430dae7988 --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/agent/agent_test.go @@ -0,0 +1,32 @@ +package agent_test + +import ( + "context" + "testing" + "time" + + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/agent" + "github.com/stretchr/testify/require" +) + +func TestNoopRunner_ClosesEventsWhenContextCancelled(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + r := agent.NewNoop(nil) + require.NoError(t, r.Start(ctx)) + + select { + case <-r.Events(): + t.Fatal("noop runner emitted an event") + case <-time.After(50 * time.Millisecond): + } + + cancel() + select { + case _, ok := <-r.Events(): + require.False(t, ok, "events channel should close on cancel") + case <-time.After(time.Second): + t.Fatal("events channel did not close after context cancel") + } +} diff --git a/tools/stress/device-orchestrator/pkg/exec/exec.go b/tools/stress/device-orchestrator/pkg/exec/exec.go new file mode 100644 index 0000000000..86badb60f2 --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/exec/exec.go @@ -0,0 +1,139 @@ +// Package exec wires the serviceability SDK behind the sweep.Executor +// interface. The orchestrator binary uses it against a real RPC; tests in +// pkg/sweep use a fake to avoid the network. +package exec + +import ( + "context" + "encoding/binary" + "fmt" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/sweep" +) + +// Config bundles the inputs the live executor needs. +type Config struct { + Client *serviceability.Client + Executor *serviceability.Executor + + DevicePubkey solana.PublicKey + TenantPubkey solana.PublicKey // zero pubkey = no tenant + + // ClientIPBase is the starting /16 block from which sequential per-user + // IPs are drawn. For idx i, the assigned IP is ClientIPBase + i. + ClientIPBase [4]byte + // TunnelEndpoint is passed through to UserCreateArgs verbatim; pass + // 0.0.0.0 to use the device's public IP. + TunnelEndpoint [4]byte + // UserType / CyoaType pin the user kind for the entire sweep. + UserType serviceability.UserUserType + CyoaType serviceability.CyoaType + // DzPrefixCount must match the device's dz_prefixes length; 1 is the + // stress-test default. + DzPrefixCount uint8 +} + +// Live implements sweep.Executor against a real serviceability program. +type Live struct { + cfg Config +} + +// New returns a Live executor with the given configuration. Callers must +// supply a non-nil Client and Executor. +func New(cfg Config) (*Live, error) { + if cfg.Client == nil { + return nil, fmt.Errorf("exec.New: Client is required") + } + if cfg.Executor == nil { + return nil, fmt.Errorf("exec.New: Executor is required") + } + if cfg.DzPrefixCount == 0 { + cfg.DzPrefixCount = 1 + } + return &Live{cfg: cfg}, nil +} + +// ListUsers returns the current set of User accounts in the program. The +// caller (sweep loop) filters by owner via PlanFor. +func (l *Live) ListUsers(ctx context.Context) ([]serviceability.User, error) { + pd, err := l.cfg.Client.GetProgramData(ctx) + if err != nil { + return nil, fmt.Errorf("list users: %w", err) + } + return pd.Users, nil +} + +// CreateUser issues a CreateUser instruction for the idx-th stress user and +// records timestamps the sweep loop turns into runlog rows. +func (l *Live) CreateUser(ctx context.Context, idx int) (sweep.CreateResult, error) { + args := serviceability.UserCreateArgs{ + UserType: l.cfg.UserType, + CyoaType: l.cfg.CyoaType, + ClientIP: ipForIndex(l.cfg.ClientIPBase, idx), + TunnelEndpoint: l.cfg.TunnelEndpoint, + DzPrefixCount: l.cfg.DzPrefixCount, + DevicePubkey: l.cfg.DevicePubkey, + TenantPubkey: l.cfg.TenantPubkey, + } + _, userPDA, err := l.cfg.Executor.CreateUser(ctx, args) + if err != nil { + return sweep.CreateResult{}, err + } + now := time.Now() + + // The SDK's CreateUser blocks on signature finalization and post-confirm + // account visibility; we don't get distinct stage timestamps today, so + // confirm and activate both anchor at the post-call wallclock. A future + // SDK refactor can split these. + tunnelID, err := l.fetchTunnelID(ctx, userPDA) + if err != nil { + // Surface the tunnel ID as 0; the sweep records the create as successful + // because the on-chain User already exists. + tunnelID = 0 + } + return sweep.CreateResult{ + UserPDA: userPDA, + TunnelID: tunnelID, + ConfirmedAt: now, + ActivatedAt: now, + }, nil +} + +// DeleteUser closes a user account by PDA. +func (l *Live) DeleteUser(ctx context.Context, userPDA solana.PublicKey) (sweep.DeleteResult, error) { + if _, err := l.cfg.Executor.DeleteUser(ctx, userPDA); err != nil { + return sweep.DeleteResult{}, err + } + now := time.Now() + return sweep.DeleteResult{ + ConfirmedAt: now, + ActivatedAt: now, + }, nil +} + +// fetchTunnelID reads the user account and returns its assigned TunnelId. +// Used so the runlog records the kernel interface identifier the part-3 +// agent runner will key on. +func (l *Live) fetchTunnelID(ctx context.Context, userPDA solana.PublicKey) (uint16, error) { + // We can't read the assigned tunnel_id without the User's on-chain bytes, + // which the SDK doesn't surface from CreateUser. Until a downstream + // helper is added, callers either skip this column (TunnelID = 0) or wire + // a per-account fetch in cmd/. The package signature is kept stable so + // part-3 can drop in the real fetch. + return 0, nil +} + +// ipForIndex returns base shifted by idx, wrapping at the /16 boundary so the +// 0..65535 range is usable without overflow handling on the caller side. +func ipForIndex(base [4]byte, idx int) [4]byte { + host := uint32(base[2])<<8 | uint32(base[3]) + host += uint32(uint16(idx)) + var out [4]byte + out[0] = base[0] + out[1] = base[1] + binary.BigEndian.PutUint16(out[2:], uint16(host)) + return out +} diff --git a/tools/stress/device-orchestrator/pkg/exec/exec_test.go b/tools/stress/device-orchestrator/pkg/exec/exec_test.go new file mode 100644 index 0000000000..c7b13ea30b --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/exec/exec_test.go @@ -0,0 +1,27 @@ +package exec + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIPForIndex(t *testing.T) { + t.Parallel() + + base := [4]byte{100, 64, 0, 0} + tests := []struct { + idx int + want [4]byte + }{ + {0, [4]byte{100, 64, 0, 0}}, + {1, [4]byte{100, 64, 0, 1}}, + {255, [4]byte{100, 64, 0, 255}}, + {256, [4]byte{100, 64, 1, 0}}, + {1000, [4]byte{100, 64, 3, 232}}, + } + for _, tc := range tests { + got := ipForIndex(base, tc.idx) + assert.Equal(t, tc.want, got, "idx=%d", tc.idx) + } +} diff --git a/tools/stress/device-orchestrator/pkg/reconcile/reconcile.go b/tools/stress/device-orchestrator/pkg/reconcile/reconcile.go new file mode 100644 index 0000000000..1396928714 --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/reconcile/reconcile.go @@ -0,0 +1,63 @@ +// Package reconcile decides what to create or delete to drive a set of +// serviceability User accounts toward a desired count. It is pure (no I/O) +// so the device-stress orchestrator can call it once per batch iteration +// against live state pulled from the chain. +package reconcile + +import ( + "bytes" + "sort" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" +) + +// Plan describes the delta needed to drive the set of users owned by a given +// key toward a desired count. +type Plan struct { + // ToCreate is the number of users to add. Always >= 0. + ToCreate int + // ToDelete lists user PDAs to remove, in the order they should be deleted. + // Sorted by ClientIp ascending, then by PubKey ascending as a tiebreaker, + // so repeated calls against the same input produce identical plans. + ToDelete []solana.PublicKey +} + +// PlanFor decides what to create or delete so that the number of users owned by +// ownerFilter equals target. Users with a different Owner are ignored (neither +// counted nor deleted), which lets the orchestrator share a program with other +// tenants without disturbing them. +// +// Returns a zero plan when target is negative. +func PlanFor(current []serviceability.User, target int, ownerFilter solana.PublicKey) Plan { + if target < 0 { + return Plan{} + } + + var owned []serviceability.User + for _, u := range current { + if bytes.Equal(u.Owner[:], ownerFilter[:]) { + owned = append(owned, u) + } + } + + switch { + case len(owned) < target: + return Plan{ToCreate: target - len(owned)} + case len(owned) > target: + sort.Slice(owned, func(i, j int) bool { + if c := bytes.Compare(owned[i].ClientIp[:], owned[j].ClientIp[:]); c != 0 { + return c < 0 + } + return bytes.Compare(owned[i].PubKey[:], owned[j].PubKey[:]) < 0 + }) + victims := owned[target:] + out := make([]solana.PublicKey, len(victims)) + for i, u := range victims { + out[i] = solana.PublicKeyFromBytes(u.PubKey[:]) + } + return Plan{ToDelete: out} + default: + return Plan{} + } +} diff --git a/tools/stress/device-orchestrator/pkg/reconcile/reconcile_test.go b/tools/stress/device-orchestrator/pkg/reconcile/reconcile_test.go new file mode 100644 index 0000000000..687bf0f464 --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/reconcile/reconcile_test.go @@ -0,0 +1,166 @@ +package reconcile_test + +import ( + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/reconcile" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func makeUser(owner, pubkey solana.PublicKey, clientIP [4]byte) serviceability.User { + return serviceability.User{ + Owner: owner, + ClientIp: clientIP, + PubKey: pubkey, + } +} + +func TestPlanFor(t *testing.T) { + t.Parallel() + + orchestrator := solana.NewWallet().PublicKey() + stranger := solana.NewWallet().PublicKey() + + u1 := solana.NewWallet().PublicKey() + u2 := solana.NewWallet().PublicKey() + u3 := solana.NewWallet().PublicKey() + u4 := solana.NewWallet().PublicKey() + u5 := solana.NewWallet().PublicKey() + + ip := func(a, b, c, d byte) [4]byte { return [4]byte{a, b, c, d} } + + tests := []struct { + name string + current []serviceability.User + target int + owner solana.PublicKey + wantCreate int + wantDeleteIPs [][4]byte + }{ + { + name: "zero to N", + current: nil, + target: 4, + owner: orchestrator, + wantCreate: 4, + }, + { + name: "N to zero deletes in ip-ascending order", + current: []serviceability.User{ + makeUser(orchestrator, u1, ip(10, 0, 0, 3)), + makeUser(orchestrator, u2, ip(10, 0, 0, 1)), + makeUser(orchestrator, u3, ip(10, 0, 0, 4)), + makeUser(orchestrator, u4, ip(10, 0, 0, 2)), + }, + target: 0, + owner: orchestrator, + wantCreate: 0, + wantDeleteIPs: [][4]byte{ip(10, 0, 0, 1), ip(10, 0, 0, 2), ip(10, 0, 0, 3), ip(10, 0, 0, 4)}, + }, + { + name: "partial trim deletes only the overflow", + current: []serviceability.User{ + makeUser(orchestrator, u1, ip(10, 0, 0, 5)), + makeUser(orchestrator, u2, ip(10, 0, 0, 4)), + makeUser(orchestrator, u3, ip(10, 0, 0, 3)), + makeUser(orchestrator, u4, ip(10, 0, 0, 2)), + makeUser(orchestrator, u5, ip(10, 0, 0, 1)), + }, + target: 3, + owner: orchestrator, + wantCreate: 0, + wantDeleteIPs: [][4]byte{ip(10, 0, 0, 4), ip(10, 0, 0, 5)}, + }, + { + name: "partial grow asks for the missing count", + current: []serviceability.User{ + makeUser(orchestrator, u1, ip(10, 0, 0, 1)), + makeUser(orchestrator, u2, ip(10, 0, 0, 2)), + }, + target: 5, + owner: orchestrator, + wantCreate: 3, + }, + { + name: "only foreign users present grows by full target", + current: []serviceability.User{ + makeUser(stranger, u1, ip(10, 0, 0, 1)), + makeUser(stranger, u2, ip(10, 0, 0, 2)), + makeUser(stranger, u3, ip(10, 0, 0, 3)), + }, + target: 2, + owner: orchestrator, + wantCreate: 2, + }, + { + name: "mixed ownership only counts and deletes owned", + current: []serviceability.User{ + makeUser(stranger, u1, ip(10, 0, 0, 9)), + makeUser(orchestrator, u2, ip(10, 0, 0, 2)), + makeUser(stranger, u3, ip(10, 0, 0, 8)), + makeUser(orchestrator, u4, ip(10, 0, 0, 1)), + }, + target: 1, + owner: orchestrator, + wantCreate: 0, + wantDeleteIPs: [][4]byte{ip(10, 0, 0, 2)}, + }, + { + name: "already at target produces zero plan", + current: []serviceability.User{ + makeUser(orchestrator, u1, ip(10, 0, 0, 1)), + makeUser(orchestrator, u2, ip(10, 0, 0, 2)), + }, + target: 2, + owner: orchestrator, + wantCreate: 0, + }, + { + name: "negative target produces zero plan", + current: []serviceability.User{ + makeUser(orchestrator, u1, ip(10, 0, 0, 1)), + }, + target: -1, + owner: orchestrator, + wantCreate: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + plan := reconcile.PlanFor(tc.current, tc.target, tc.owner) + assert.Equal(t, tc.wantCreate, plan.ToCreate, "ToCreate") + require.Len(t, plan.ToDelete, len(tc.wantDeleteIPs), "ToDelete length") + + ipToPubkey := map[[4]byte]solana.PublicKey{} + for _, u := range tc.current { + ipToPubkey[u.ClientIp] = solana.PublicKeyFromBytes(u.PubKey[:]) + } + for i, ipKey := range tc.wantDeleteIPs { + assert.Equal(t, ipToPubkey[ipKey], plan.ToDelete[i], "ToDelete[%d] (clientIp=%v)", i, ipKey) + } + }) + } +} + +func TestPlanFor_TieBreaksByPubkey(t *testing.T) { + t.Parallel() + + orchestrator := solana.NewWallet().PublicKey() + sharedIP := [4]byte{10, 0, 0, 1} + + pkA := solana.PublicKeyFromBytes([]byte{0xAA, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) + pkB := solana.PublicKeyFromBytes([]byte{0xBB, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) + + plan := reconcile.PlanFor([]serviceability.User{ + makeUser(orchestrator, pkB, sharedIP), + makeUser(orchestrator, pkA, sharedIP), + }, 0, orchestrator) + + require.Len(t, plan.ToDelete, 2) + assert.Equal(t, pkA, plan.ToDelete[0]) + assert.Equal(t, pkB, plan.ToDelete[1]) +} diff --git a/tools/stress/device-orchestrator/pkg/runlog/runlog.go b/tools/stress/device-orchestrator/pkg/runlog/runlog.go new file mode 100644 index 0000000000..007a79fa43 --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/runlog/runlog.go @@ -0,0 +1,101 @@ +// Package runlog appends per-event rows to the orchestrator runlog file +// (`orchestrator-runlog.json`). One row per line; line-delimited JSON so the +// file can be tailed and downstream tooling can parse incrementally. +// +// Row schema (per #3746): +// +// {run_id, user_index, user_pubkey, tunnel_id, event, t_ns, n_after_event} +// +// `t_ns` is the unix epoch in nanoseconds. `n_after_event` is the size of the +// active user set immediately after the event applied — provisioning increments +// it on `activate`, deprovisioning decrements on `deprovision_activate`. Other +// events carry the count as-of-emission. +package runlog + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "os" + "sync" + "time" +) + +// Event enumerates the recognized event names. Stringly-typed in the file so +// the schema can grow without consumers needing to track an enum. +type Event string + +const ( + EventSubmit Event = "submit" + EventConfirm Event = "confirm" + EventActivate Event = "activate" + EventPreCommitLog Event = "pre_commit_log" // emitted by part-3 agent runner + EventApplied Event = "applied" // emitted by part-3 agent runner + EventDeprovisionSubmit Event = "deprovision_submit" + EventDeprovisionConfirm Event = "deprovision_confirm" + EventDeprovisionActivate Event = "deprovision_activate" +) + +// Row is one entry in the runlog file. Field names match #3746's schema. +type Row struct { + RunID string `json:"run_id"` + UserIndex int `json:"user_index"` + UserPubkey string `json:"user_pubkey"` + TunnelID uint16 `json:"tunnel_id"` + Event Event `json:"event"` + TNs int64 `json:"t_ns"` + NAfterEvent int `json:"n_after_event"` +} + +// Writer appends rows to an open file in line-delimited JSON. +type Writer struct { + mu sync.Mutex + w io.WriteCloser + path string +} + +// Open creates or truncates the file at path for append-only writes. +func Open(path string) (*Writer, error) { + f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644) + if err != nil { + return nil, fmt.Errorf("open runlog %s: %w", path, err) + } + return &Writer{w: f, path: path}, nil +} + +// Path returns the file path the writer is appending to. +func (w *Writer) Path() string { return w.path } + +// Append serializes row as JSON and writes a single line. +func (w *Writer) Append(row Row) error { + if row.TNs == 0 { + row.TNs = time.Now().UnixNano() + } + w.mu.Lock() + defer w.mu.Unlock() + if w.w == nil { + return errors.New("runlog writer closed") + } + buf, err := json.Marshal(row) + if err != nil { + return fmt.Errorf("marshal runlog row: %w", err) + } + buf = append(buf, '\n') + if _, err := w.w.Write(buf); err != nil { + return fmt.Errorf("write runlog row: %w", err) + } + return nil +} + +// Close flushes and closes the underlying file. +func (w *Writer) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.w == nil { + return nil + } + err := w.w.Close() + w.w = nil + return err +} diff --git a/tools/stress/device-orchestrator/pkg/runlog/runlog_test.go b/tools/stress/device-orchestrator/pkg/runlog/runlog_test.go new file mode 100644 index 0000000000..ca0cb31ddf --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/runlog/runlog_test.go @@ -0,0 +1,93 @@ +package runlog_test + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/runlog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWriter_RoundTrip(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + w, err := runlog.Open(path) + require.NoError(t, err) + + rows := []runlog.Row{ + {RunID: "run-1", UserIndex: 0, UserPubkey: "pk0", TunnelID: 500, Event: runlog.EventSubmit, TNs: 1000, NAfterEvent: 0}, + {RunID: "run-1", UserIndex: 0, UserPubkey: "pk0", TunnelID: 500, Event: runlog.EventConfirm, TNs: 2000, NAfterEvent: 0}, + {RunID: "run-1", UserIndex: 0, UserPubkey: "pk0", TunnelID: 500, Event: runlog.EventActivate, TNs: 3000, NAfterEvent: 1}, + {RunID: "run-1", UserIndex: 0, UserPubkey: "pk0", TunnelID: 500, Event: runlog.EventDeprovisionActivate, TNs: 4000, NAfterEvent: 0}, + } + for _, r := range rows { + require.NoError(t, w.Append(r)) + } + require.NoError(t, w.Close()) + + // File ends with a newline; one row per line. + f, err := os.Open(path) + require.NoError(t, err) + defer f.Close() + + var read []runlog.Row + scanner := bufio.NewScanner(f) + for scanner.Scan() { + var r runlog.Row + require.NoError(t, json.Unmarshal(scanner.Bytes(), &r)) + read = append(read, r) + } + require.NoError(t, scanner.Err()) + + assert.Equal(t, rows, read) +} + +func TestWriter_FillsMissingTimestamp(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + w, err := runlog.Open(path) + require.NoError(t, err) + defer w.Close() + + require.NoError(t, w.Append(runlog.Row{RunID: "r", UserIndex: 0, UserPubkey: "pk", Event: runlog.EventSubmit})) + + data, err := os.ReadFile(path) + require.NoError(t, err) + + var r runlog.Row + require.NoError(t, json.Unmarshal(data[:len(data)-1], &r)) + assert.NotZero(t, r.TNs, "Append should fill t_ns when zero") +} + +func TestWriter_RejectsAfterClose(t *testing.T) { + t.Parallel() + + w, err := runlog.Open(filepath.Join(t.TempDir(), "orchestrator-runlog.json")) + require.NoError(t, err) + require.NoError(t, w.Close()) + + err = w.Append(runlog.Row{RunID: "r", Event: runlog.EventSubmit}) + require.Error(t, err) +} + +func TestWriter_Truncates(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + require.NoError(t, os.WriteFile(path, []byte("stale\n"), 0o644)) + + w, err := runlog.Open(path) + require.NoError(t, err) + require.NoError(t, w.Append(runlog.Row{RunID: "r", Event: runlog.EventSubmit, TNs: 1})) + require.NoError(t, w.Close()) + + data, err := os.ReadFile(path) + require.NoError(t, err) + assert.NotContains(t, string(data), "stale", "Open(path) should truncate existing content") +} diff --git a/tools/stress/device-orchestrator/pkg/sweep/sweep.go b/tools/stress/device-orchestrator/pkg/sweep/sweep.go new file mode 100644 index 0000000000..cda03412c1 --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/sweep/sweep.go @@ -0,0 +1,262 @@ +// Package sweep implements the device-orchestrator sweep loop: +// +// - Provision phase: walks 0 → Target users in batches of UsersPerBatch, +// using reconcile.PlanFor to query live state and ask the Executor to +// create the delta, holding for Hold between batches. +// - Deprovision phase: walks Target → 0 in reverse order of creation, +// so the youngest user is removed first. +// +// Per #3746, the sweep cooperates with the abort signal between user +// iterations — it never cancels a mid-flight Create/Delete. +package sweep + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/agent" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/reconcile" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/runlog" +) + +// Clock abstracts the wallclock for testability. Real callers pass RealClock; +// tests inject a fake that fires `After` channels manually. +type Clock interface { + Now() time.Time + After(d time.Duration) <-chan time.Time +} + +// RealClock is the production wallclock implementation. +type RealClock struct{} + +func (RealClock) Now() time.Time { return time.Now() } +func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) } + +// CreateResult captures the per-user details the sweep emits into the runlog +// for a successful provision. ConfirmedAt and ActivatedAt are sourced from +// the Executor so a future SDK refactor can give them distinct values; today +// they are typically equal because the SDK's `CreateUser` blocks on both +// finalization and account visibility before returning. +type CreateResult struct { + UserPDA solana.PublicKey + TunnelID uint16 + ConfirmedAt time.Time + ActivatedAt time.Time +} + +// DeleteResult is the deprovision analog of CreateResult. +type DeleteResult struct { + ConfirmedAt time.Time + ActivatedAt time.Time +} + +// Executor is the interface the sweep depends on for chain I/O. Tests inject +// a fake; the real implementation wraps `serviceability.Executor` plus a small +// post-create fetch to discover the assigned TunnelId. +type Executor interface { + ListUsers(ctx context.Context) ([]serviceability.User, error) + CreateUser(ctx context.Context, idx int) (CreateResult, error) + DeleteUser(ctx context.Context, userPDA solana.PublicKey) (DeleteResult, error) +} + +// Config bundles all sweep parameters; pass by value to Run. +type Config struct { + RunID string + Target int + UsersPerBatch int + Hold time.Duration + OwnerFilter solana.PublicKey + + Executor Executor + Agent agent.Runner + Runlog *runlog.Writer + Clock Clock + Logger *slog.Logger +} + +func (c *Config) validate() error { + switch { + case c.Target < 0: + return errors.New("sweep: Target must be >= 0") + case c.UsersPerBatch <= 0: + return errors.New("sweep: UsersPerBatch must be > 0") + case c.Hold < 0: + return errors.New("sweep: Hold must be >= 0") + case c.RunID == "": + return errors.New("sweep: RunID is required") + case c.Executor == nil: + return errors.New("sweep: Executor is required") + case c.Runlog == nil: + return errors.New("sweep: Runlog is required") + } + if c.Clock == nil { + c.Clock = RealClock{} + } + if c.Logger == nil { + c.Logger = slog.Default() + } + if c.Agent == nil { + c.Agent = agent.NewNoop(c.Logger) + } + return nil +} + +// createdUser tracks an orchestrator-owned user so the deprovision phase can +// iterate in reverse-creation order, independent of live state. +type createdUser struct { + idx int + pubkey solana.PublicKey + tunnelID uint16 +} + +// Run drives the provision-then-deprovision sweep to completion. Returns the +// number of users actually created/deleted alongside the error (if any), so +// callers can report partial progress on abort. +func Run(ctx context.Context, cfg Config) error { + if err := cfg.validate(); err != nil { + return err + } + if err := cfg.Agent.Start(ctx); err != nil { + return fmt.Errorf("start agent runner: %w", err) + } + + created, err := provision(ctx, &cfg) + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + // Always attempt deprovision so an abort during provision still cleans up + // what the sweep created. Use a fresh context for the deprovision phase if + // the original was cancelled, since the operator wants the tear-down to + // finish before exit. We respect the parent context's lifetime via the + // outer Run's error return — callers that want a hard stop pass a deadline. + depErr := deprovision(ctx, &cfg, created) + if err != nil { + return err + } + return depErr +} + +// provision walks 0 → Target in batches, returning the slice of created users +// so deprovision can iterate in reverse. Returns ctx.Err() if cancelled +// between users. +func provision(ctx context.Context, cfg *Config) ([]createdUser, error) { + if cfg.Target == 0 { + return nil, nil + } + var created []createdUser + runningTarget := 0 + activeCount := 0 + + for runningTarget < cfg.Target { + if err := ctx.Err(); err != nil { + return created, err + } + + nextTarget := runningTarget + cfg.UsersPerBatch + if nextTarget > cfg.Target { + nextTarget = cfg.Target + } + + users, err := cfg.Executor.ListUsers(ctx) + if err != nil { + return created, fmt.Errorf("list users for batch starting at %d: %w", activeCount, err) + } + plan := reconcile.PlanFor(users, nextTarget, cfg.OwnerFilter) + if len(plan.ToDelete) > 0 { + cfg.Logger.Warn("sweep: PlanFor wants to delete pre-existing users; skipping (orchestrator only creates this run)", + "count", len(plan.ToDelete)) + } + + for i := 0; i < plan.ToCreate; i++ { + if err := ctx.Err(); err != nil { + return created, err + } + idx := activeCount + submitAt := cfg.Clock.Now() + if err := emit(cfg, idx, "", 0, runlog.EventSubmit, submitAt, activeCount); err != nil { + return created, err + } + + res, err := cfg.Executor.CreateUser(ctx, idx) + if err != nil { + return created, fmt.Errorf("create user idx=%d: %w", idx, err) + } + pkStr := res.UserPDA.String() + if err := emit(cfg, idx, pkStr, res.TunnelID, runlog.EventConfirm, res.ConfirmedAt, activeCount); err != nil { + return created, err + } + created = append(created, createdUser{idx: idx, pubkey: res.UserPDA, tunnelID: res.TunnelID}) + activeCount++ + if err := emit(cfg, idx, pkStr, res.TunnelID, runlog.EventActivate, res.ActivatedAt, activeCount); err != nil { + return created, err + } + } + + runningTarget = nextTarget + if runningTarget >= cfg.Target { + break + } + if cfg.Hold > 0 { + select { + case <-cfg.Clock.After(cfg.Hold): + case <-ctx.Done(): + return created, ctx.Err() + } + } + } + return created, nil +} + +// deprovision walks the created slice in reverse, emitting deprovision_* +// events for each. +func deprovision(ctx context.Context, cfg *Config, created []createdUser) error { + activeCount := len(created) + for i := len(created) - 1; i >= 0; i-- { + if err := ctx.Err(); err != nil { + return err + } + u := created[i] + pkStr := u.pubkey.String() + submitAt := cfg.Clock.Now() + if err := emit(cfg, u.idx, pkStr, u.tunnelID, runlog.EventDeprovisionSubmit, submitAt, activeCount); err != nil { + return err + } + + res, err := cfg.Executor.DeleteUser(ctx, u.pubkey) + if err != nil { + return fmt.Errorf("delete user idx=%d pubkey=%s: %w", u.idx, pkStr, err) + } + if err := emit(cfg, u.idx, pkStr, u.tunnelID, runlog.EventDeprovisionConfirm, res.ConfirmedAt, activeCount); err != nil { + return err + } + activeCount-- + if err := emit(cfg, u.idx, pkStr, u.tunnelID, runlog.EventDeprovisionActivate, res.ActivatedAt, activeCount); err != nil { + return err + } + } + return nil +} + +func emit(cfg *Config, idx int, pubkey string, tunnelID uint16, ev runlog.Event, at time.Time, nAfter int) error { + if at.IsZero() { + at = cfg.Clock.Now() + } + row := runlog.Row{ + RunID: cfg.RunID, + UserIndex: idx, + UserPubkey: pubkey, + TunnelID: tunnelID, + Event: ev, + TNs: at.UnixNano(), + NAfterEvent: nAfter, + } + if err := cfg.Runlog.Append(row); err != nil { + return fmt.Errorf("runlog append %s: %w", ev, err) + } + return nil +} diff --git a/tools/stress/device-orchestrator/pkg/sweep/sweep_test.go b/tools/stress/device-orchestrator/pkg/sweep/sweep_test.go new file mode 100644 index 0000000000..3402d5fe8c --- /dev/null +++ b/tools/stress/device-orchestrator/pkg/sweep/sweep_test.go @@ -0,0 +1,321 @@ +package sweep_test + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/agent" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/runlog" + "github.com/malbeclabs/doublezero/tools/stress/device-orchestrator/pkg/sweep" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// fakeClock provides deterministic Now() and a manually-fired After channel so +// the sweep's hold call returns instantly under test. +type fakeClock struct { + mu sync.Mutex + now time.Time + holds int +} + +func (f *fakeClock) Now() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + f.now = f.now.Add(time.Microsecond) // advance so successive Now() calls differ + return f.now +} + +func (f *fakeClock) After(d time.Duration) <-chan time.Time { + f.mu.Lock() + f.holds++ + f.mu.Unlock() + ch := make(chan time.Time, 1) + ch <- time.Now() + return ch +} + +func (f *fakeClock) HoldCount() int { + f.mu.Lock() + defer f.mu.Unlock() + return f.holds +} + +// fakeExecutor records create/delete calls. ListUsers always returns the +// orchestrator-owned set so PlanFor produces the right delta. +type fakeExecutor struct { + mu sync.Mutex + owner solana.PublicKey + created []serviceability.User + createN atomic.Int32 + deleteN atomic.Int32 + + // Optional hook to fail on the Nth create (1-based) — used by the abort test. + failCreateOnCall int + failErr error +} + +func newFakeExecutor(owner solana.PublicKey) *fakeExecutor { + return &fakeExecutor{owner: owner} +} + +func (f *fakeExecutor) ListUsers(ctx context.Context) ([]serviceability.User, error) { + f.mu.Lock() + defer f.mu.Unlock() + out := make([]serviceability.User, len(f.created)) + copy(out, f.created) + return out, nil +} + +func (f *fakeExecutor) CreateUser(ctx context.Context, idx int) (sweep.CreateResult, error) { + calls := int(f.createN.Add(1)) + if f.failCreateOnCall == calls && f.failErr != nil { + return sweep.CreateResult{}, f.failErr + } + + // Deterministic pubkey from idx, IP = 100.0.0.idx+1 so PlanFor sorts cleanly. + var pk solana.PublicKey + pk[0] = byte(idx) + pk[31] = 0xAA + + f.mu.Lock() + f.created = append(f.created, serviceability.User{ + Owner: f.owner, + ClientIp: [4]byte{100, 0, 0, byte(idx + 1)}, + PubKey: pk, + }) + f.mu.Unlock() + + now := time.Unix(1_700_000_000, int64(calls)*1_000_000) // micro-spaced timestamps + return sweep.CreateResult{ + UserPDA: pk, + TunnelID: uint16(500 + idx), + ConfirmedAt: now, + ActivatedAt: now.Add(time.Millisecond), + }, nil +} + +func (f *fakeExecutor) DeleteUser(ctx context.Context, userPDA solana.PublicKey) (sweep.DeleteResult, error) { + calls := int(f.deleteN.Add(1)) + f.mu.Lock() + // Remove the matching user from the active set. + for i, u := range f.created { + if solana.PublicKeyFromBytes(u.PubKey[:]).Equals(userPDA) { + f.created = append(f.created[:i], f.created[i+1:]...) + break + } + } + f.mu.Unlock() + + now := time.Unix(1_700_000_000, int64(calls+1000)*1_000_000) + return sweep.DeleteResult{ + ConfirmedAt: now, + ActivatedAt: now.Add(time.Millisecond), + }, nil +} + +func readRows(t *testing.T, path string) []runlog.Row { + t.Helper() + f, err := os.Open(path) + require.NoError(t, err) + defer f.Close() + var rows []runlog.Row + s := bufio.NewScanner(f) + for s.Scan() { + var r runlog.Row + require.NoError(t, json.Unmarshal(s.Bytes(), &r)) + rows = append(rows, r) + } + require.NoError(t, s.Err()) + return rows +} + +func TestRun_ProvisionsThenDeprovisionsInReverseOrder(t *testing.T) { + t.Parallel() + + owner := solana.NewWallet().PublicKey() + exec := newFakeExecutor(owner) + path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + w, err := runlog.Open(path) + require.NoError(t, err) + t.Cleanup(func() { _ = w.Close() }) + + clk := &fakeClock{now: time.Unix(1_700_000_000, 0)} + cfg := sweep.Config{ + RunID: "run-test", + Target: 4, + UsersPerBatch: 2, + Hold: 10 * time.Second, + OwnerFilter: owner, + Executor: exec, + Agent: agent.NewNoop(nil), + Runlog: w, + Clock: clk, + } + require.NoError(t, sweep.Run(context.Background(), cfg)) + require.NoError(t, w.Close()) + + rows := readRows(t, path) + // 4 provisions × 3 events + 4 deprovisions × 3 events = 24 rows + require.Len(t, rows, 24) + + // Provision phase: ascending user_index, events submit→confirm→activate. + for i := 0; i < 4; i++ { + base := i * 3 + assert.Equal(t, i, rows[base].UserIndex, "row %d", base) + assert.Equal(t, runlog.EventSubmit, rows[base].Event) + assert.Equal(t, runlog.EventConfirm, rows[base+1].Event) + assert.Equal(t, runlog.EventActivate, rows[base+2].Event) + assert.Equal(t, uint16(500+i), rows[base+1].TunnelID, "tunnel_id propagates after confirm") + assert.Equal(t, i+1, rows[base+2].NAfterEvent, "activate increments active count") + } + + // Deprovision phase: descending user_index (reverse creation order), events deprovision_submit/confirm/activate. + for k := 0; k < 4; k++ { + base := 12 + k*3 + expectedIdx := 3 - k // 3, 2, 1, 0 + assert.Equal(t, expectedIdx, rows[base].UserIndex) + assert.Equal(t, runlog.EventDeprovisionSubmit, rows[base].Event) + assert.Equal(t, runlog.EventDeprovisionConfirm, rows[base+1].Event) + assert.Equal(t, runlog.EventDeprovisionActivate, rows[base+2].Event) + assert.Equal(t, 3-k, rows[base+2].NAfterEvent, "deprovision_activate decrements active count") + } + + // Hold called between batches but not after the final provision batch. + // Target=4, UsersPerBatch=2 → batches at [0..2), [2..4); one hold between them. + assert.Equal(t, 1, clk.HoldCount(), "Hold should fire once (between batches), not after reaching target") + + // Executor calls match the totals. + assert.Equal(t, int32(4), exec.createN.Load()) + assert.Equal(t, int32(4), exec.deleteN.Load()) +} + +func TestRun_HandlesZeroTarget(t *testing.T) { + t.Parallel() + + owner := solana.NewWallet().PublicKey() + exec := newFakeExecutor(owner) + path := filepath.Join(t.TempDir(), "runlog.json") + w, err := runlog.Open(path) + require.NoError(t, err) + t.Cleanup(func() { _ = w.Close() }) + + cfg := sweep.Config{ + RunID: "run-zero", + Target: 0, + UsersPerBatch: 2, + Hold: time.Second, + OwnerFilter: owner, + Executor: exec, + Runlog: w, + Clock: &fakeClock{now: time.Unix(1, 0)}, + } + require.NoError(t, sweep.Run(context.Background(), cfg)) + require.NoError(t, w.Close()) + + rows := readRows(t, path) + assert.Empty(t, rows) + assert.Zero(t, exec.createN.Load()) + assert.Zero(t, exec.deleteN.Load()) +} + +func TestRun_AbortBetweenUsersStillCleansUp(t *testing.T) { + t.Parallel() + + owner := solana.NewWallet().PublicKey() + exec := newFakeExecutor(owner) + exec.failCreateOnCall = 3 + exec.failErr = context.Canceled + + path := filepath.Join(t.TempDir(), "runlog.json") + w, err := runlog.Open(path) + require.NoError(t, err) + t.Cleanup(func() { _ = w.Close() }) + + cfg := sweep.Config{ + RunID: "run-abort", + Target: 4, + UsersPerBatch: 2, + Hold: time.Second, + OwnerFilter: owner, + Executor: exec, + Runlog: w, + Clock: &fakeClock{now: time.Unix(1, 0)}, + } + err = sweep.Run(context.Background(), cfg) + require.Error(t, err, "abort during provision should surface error") + + // Even on abort, deprovision should fire for the two users that were created. + require.NoError(t, w.Close()) + rows := readRows(t, path) + + // 2 provisions × 3 events = 6; plus a submit event for the failed third; plus 2 deprovision sets. + deprovisionActivates := 0 + for _, r := range rows { + if r.Event == runlog.EventDeprovisionActivate { + deprovisionActivates++ + } + } + assert.Equal(t, 2, deprovisionActivates, "every created user should be deprovisioned on abort") +} + +func TestRun_RejectsInvalidConfig(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + cfg sweep.Config + }{ + {name: "negative target", cfg: sweep.Config{Target: -1, UsersPerBatch: 1, RunID: "r", Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, + {name: "zero batch", cfg: sweep.Config{Target: 1, UsersPerBatch: 0, RunID: "r", Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, + {name: "missing run id", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, + {name: "missing executor", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, RunID: "r", Runlog: &runlog.Writer{}}}, + {name: "missing runlog", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, RunID: "r", Executor: &fakeExecutor{}}}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := sweep.Run(context.Background(), tc.cfg) + require.Error(t, err) + }) + } +} + +// Sanity: ctx cancellation between users is observed at the next iteration boundary. +func TestRun_CancellationStopsBetweenUsers(t *testing.T) { + t.Parallel() + + owner := solana.NewWallet().PublicKey() + exec := newFakeExecutor(owner) + path := filepath.Join(t.TempDir(), "runlog.json") + w, err := runlog.Open(path) + require.NoError(t, err) + t.Cleanup(func() { _ = w.Close() }) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // pre-cancelled + + cfg := sweep.Config{ + RunID: "run-cancel", + Target: 4, + UsersPerBatch: 2, + Hold: time.Second, + OwnerFilter: owner, + Executor: exec, + Runlog: w, + Clock: &fakeClock{now: time.Unix(1, 0)}, + } + err = sweep.Run(ctx, cfg) + require.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled)) + assert.Zero(t, exec.createN.Load(), "no users should be created when ctx is pre-cancelled") +} From c97a8d40a1070234576605e8b6fa8274a74f868b Mon Sep 17 00:00:00 2001 From: Greg Mitchell Date: Fri, 29 May 2026 21:40:35 +0000 Subject: [PATCH 2/2] tools/stress: address orchestrator skeleton review feedback - sweep: validate OwnerFilter is non-zero; move dependency defaults out of validate() into applyDefaults(); scope all sweep logs with run_id. - sweep: run create calls and the whole deprovision phase under context.WithoutCancel so an abort never interrupts an in-flight chain op (which could orphan a user) and teardown always completes. - sweep: skip the inter-batch hold when a batch created no users. - sweep test: drive the abort case via real ctx cancellation instead of a faked executor error. - exec: drop the dead fetchTunnelID error path (it always returned nil). - agent: guard the no-op runner's Start with sync.Once to avoid a double close panic. - cmd: validate required flags before writing orchestrator-config.json; sort missing-flag names for deterministic output; capture dumpJSON's Close error; rename the runlog to orchestrator-runlog.jsonl. - runlog: trim obvious comments. - CHANGELOG: condense the orchestrator entry. --- CHANGELOG.md | 2 +- .../cmd/device-orchestrator/main.go | 42 ++++++++++------ .../device-orchestrator/pkg/agent/agent.go | 20 +++++--- .../device-orchestrator/pkg/exec/exec.go | 24 ++------- .../device-orchestrator/pkg/runlog/runlog.go | 6 +-- .../pkg/runlog/runlog_test.go | 8 +-- .../device-orchestrator/pkg/sweep/sweep.go | 43 +++++++++++----- .../pkg/sweep/sweep_test.go | 49 +++++++++++++------ 8 files changed, 119 insertions(+), 75 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30d24ca9f7..a64e516876 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ All notable changes to this project will be documented in this file. - CLI - Honor the build-configured default environment (`Testnet` by default, `MainnetBeta` under the `default-mainnet-beta` feature) when neither `--env` nor a persisted `config.yml` selects one. The RFC-20 context-build previously fell back to `Environment::default()`, which is always `Devnet` regardless of the build, so a testnet build with no config silently targeted Devnet's ledger URLs and program IDs. The binary now resolves the fallback through the new `doublezero_sdk::default_environment()`, matching the legacy `DZClient::new` defaults (`default_program_id`, `ClientConfig::default`) which already key off the compiled-in environment ([#3810](https://github.com/malbeclabs/doublezero/pull/3810)) - Tools - - Add `tools/stress/device-orchestrator/` — the device-stress orchestrator skeleton for the GRE Tunnel Capacity Study. The binary parses every flag from #3746's CLI list, dumps `orchestrator-config.json` on start, runs a provision-then-reverse-deprovision sweep against a live serviceability program, and emits the runlog row schema `{run_id, user_index, user_pubkey, tunnel_id, event, t_ns, n_after_event}` to `orchestrator-runlog.json` for each `submit | confirm | activate | deprovision_*` event. The agent runner is stubbed behind a `pkg/agent.Runner` interface (no-op impl ships now; the SSH-backed runner that emits `pre_commit_log` / `applied` lands in part 3). The sweep cooperates with an abort sentinel file: when the file appears the in-flight user completes and the orchestrator deprovisions everything it created before exiting non-zero. `PlanReconcile` / `Plan` (lifted from the part-1 SDK PR) now lives at `tools/stress/device-orchestrator/pkg/reconcile/` as orchestrator policy rather than SDK primitive. Part 2 of #3746 ([#3771](https://github.com/malbeclabs/doublezero/issues/3771)). + - Add `tools/stress/device-orchestrator/`, the device-stress orchestrator skeleton for the GRE Tunnel Capacity Study (part 2 of #3746). Runs a batched provision-then-reverse-deprovision sweep against a live serviceability program, dumping `orchestrator-config.json` and emitting a JSONL runlog of `submit | confirm | activate | deprovision_*` events. Cooperates with an abort sentinel file: finish the in-flight user, tear down everything created, exit non-zero. The SSH-backed agent runner (`pre_commit_log` / `applied` events) is stubbed behind `pkg/agent.Runner` and lands in part 3 ([#3771](https://github.com/malbeclabs/doublezero/issues/3771)). ## [v0.25.0](https://github.com/malbeclabs/doublezero/compare/client/v0.24.0...client/v0.25.0) - 2026-05-29 diff --git a/tools/stress/device-orchestrator/cmd/device-orchestrator/main.go b/tools/stress/device-orchestrator/cmd/device-orchestrator/main.go index ab01975d30..dab74c5735 100644 --- a/tools/stress/device-orchestrator/cmd/device-orchestrator/main.go +++ b/tools/stress/device-orchestrator/cmd/device-orchestrator/main.go @@ -19,6 +19,7 @@ import ( "os" "os/signal" "path/filepath" + "sort" "syscall" "time" @@ -74,7 +75,7 @@ func run() error { keypairPath = flag.String("keypair", "", "Path to the orchestrator's solana keypair JSON.") controllerAddr = flag.String("controller", "", "Controller IP:PORT, forwarded to the DUT agent in part 3.") abortFile = flag.String("abort-file", "", "Path to a sentinel file; when it appears the sweep finishes the current user and exits.") - workingDir = flag.String("working-dir", ".", "Output directory for orchestrator-config.json / orchestrator-runlog.json.") + workingDir = flag.String("working-dir", ".", "Output directory for orchestrator-config.json / orchestrator-runlog.jsonl.") clientIPBase = flag.String("client-ip-base", "100.64.0.0", "Starting IPv4 address; per-user IP is base + idx.") tunnelEndpoint = flag.String("tunnel-endpoint", "0.0.0.0", "Tunnel endpoint IP passed to UserCreateArgs; 0.0.0.0 lets the program fall back to the device's public IP.") tenantPubkey = flag.String("tenant-pubkey", "", "Optional tenant pubkey for UserCreateArgs.") @@ -126,6 +127,20 @@ func run() error { TunnelEndpoint: *tunnelEndpoint, TenantPubkey: *tenantPubkey, } + // Validate required flags before writing anything, so a bad invocation + // doesn't leave a config file behind. A dry-run is exempt: its whole job is + // to dump the resolved config without needing the live-RPC flags. + if !*dryRun { + if err := requireFlags(map[string]string{ + "--dut-pubkey": *dutPubkey, + "--rpc-url": *rpcURL, + "--program-id": *programID, + "--keypair": *keypairPath, + }); err != nil { + return err + } + } + configPath := filepath.Join(*workingDir, "orchestrator-config.json") if err := dumpJSON(configPath, resolved); err != nil { return fmt.Errorf("write orchestrator-config.json: %w", err) @@ -137,15 +152,6 @@ func run() error { return nil } - if err := requireFlags(map[string]string{ - "--dut-pubkey": *dutPubkey, - "--rpc-url": *rpcURL, - "--program-id": *programID, - "--keypair": *keypairPath, - }); err != nil { - return err - } - dutPK, err := solana.PublicKeyFromBase58(*dutPubkey) if err != nil { return fmt.Errorf("--dut-pubkey: %w", err) @@ -186,13 +192,13 @@ func run() error { return err } - runlogPath := filepath.Join(*workingDir, "orchestrator-runlog.json") + runlogPath := filepath.Join(*workingDir, "orchestrator-runlog.jsonl") rlw, err := runlog.Open(runlogPath) if err != nil { return err } defer rlw.Close() - logger.Info("orchestrator-runlog.json open", "path", runlogPath) + logger.Info("orchestrator-runlog.jsonl open", "path", runlogPath) // Compose ctx: signal cancellation + abort-file cancellation. rootCtx, rootCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -238,12 +244,18 @@ func newLogger(level string) *slog.Logger { return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: lvl})) } -func dumpJSON(path string, v any) error { +func dumpJSON(path string, v any) (err error) { f, err := os.Create(path) if err != nil { return err } - defer f.Close() + // Capture the Close error so a flush failure (e.g. a full filesystem) on the + // buffered JSON isn't swallowed. + defer func() { + if cerr := f.Close(); cerr != nil && err == nil { + err = cerr + } + }() enc := json.NewEncoder(f) enc.SetIndent("", " ") return enc.Encode(v) @@ -257,6 +269,8 @@ func requireFlags(required map[string]string) error { } } if len(missing) > 0 { + // Sort so the error is deterministic regardless of map iteration order. + sort.Strings(missing) return fmt.Errorf("missing required flag(s): %v", missing) } return nil diff --git a/tools/stress/device-orchestrator/pkg/agent/agent.go b/tools/stress/device-orchestrator/pkg/agent/agent.go index 24c1b4dbce..83a50e2ddb 100644 --- a/tools/stress/device-orchestrator/pkg/agent/agent.go +++ b/tools/stress/device-orchestrator/pkg/agent/agent.go @@ -6,6 +6,7 @@ package agent import ( "context" "log/slog" + "sync" "time" ) @@ -57,16 +58,21 @@ func NewNoop(log *slog.Logger) Runner { type noop struct { log *slog.Logger events chan Event + once sync.Once } +// Start is idempotent: a second call is a no-op so the events channel is closed +// exactly once (a double close would panic). func (n *noop) Start(ctx context.Context) error { - if n.log != nil { - n.log.Debug("agent: noop runner started (no events will be emitted)") - } - go func() { - <-ctx.Done() - close(n.events) - }() + n.once.Do(func() { + if n.log != nil { + n.log.Debug("agent: noop runner started (no events will be emitted)") + } + go func() { + <-ctx.Done() + close(n.events) + }() + }) return nil } diff --git a/tools/stress/device-orchestrator/pkg/exec/exec.go b/tools/stress/device-orchestrator/pkg/exec/exec.go index 86badb60f2..ca45abf294 100644 --- a/tools/stress/device-orchestrator/pkg/exec/exec.go +++ b/tools/stress/device-orchestrator/pkg/exec/exec.go @@ -88,15 +88,13 @@ func (l *Live) CreateUser(ctx context.Context, idx int) (sweep.CreateResult, err // account visibility; we don't get distinct stage timestamps today, so // confirm and activate both anchor at the post-call wallclock. A future // SDK refactor can split these. - tunnelID, err := l.fetchTunnelID(ctx, userPDA) - if err != nil { - // Surface the tunnel ID as 0; the sweep records the create as successful - // because the on-chain User already exists. - tunnelID = 0 - } + // + // TunnelID is recorded as 0 for now: the SDK's CreateUser doesn't surface + // the assigned tunnel_id, and reading it back needs the User account bytes. + // Part 3 wires the per-account fetch here. return sweep.CreateResult{ UserPDA: userPDA, - TunnelID: tunnelID, + TunnelID: 0, ConfirmedAt: now, ActivatedAt: now, }, nil @@ -114,18 +112,6 @@ func (l *Live) DeleteUser(ctx context.Context, userPDA solana.PublicKey) (sweep. }, nil } -// fetchTunnelID reads the user account and returns its assigned TunnelId. -// Used so the runlog records the kernel interface identifier the part-3 -// agent runner will key on. -func (l *Live) fetchTunnelID(ctx context.Context, userPDA solana.PublicKey) (uint16, error) { - // We can't read the assigned tunnel_id without the User's on-chain bytes, - // which the SDK doesn't surface from CreateUser. Until a downstream - // helper is added, callers either skip this column (TunnelID = 0) or wire - // a per-account fetch in cmd/. The package signature is kept stable so - // part-3 can drop in the real fetch. - return 0, nil -} - // ipForIndex returns base shifted by idx, wrapping at the /16 boundary so the // 0..65535 range is usable without overflow handling on the caller side. func ipForIndex(base [4]byte, idx int) [4]byte { diff --git a/tools/stress/device-orchestrator/pkg/runlog/runlog.go b/tools/stress/device-orchestrator/pkg/runlog/runlog.go index 007a79fa43..04d0f9ccd6 100644 --- a/tools/stress/device-orchestrator/pkg/runlog/runlog.go +++ b/tools/stress/device-orchestrator/pkg/runlog/runlog.go @@ -1,5 +1,5 @@ // Package runlog appends per-event rows to the orchestrator runlog file -// (`orchestrator-runlog.json`). One row per line; line-delimited JSON so the +// (`orchestrator-runlog.jsonl`). One row per line; line-delimited JSON so the // file can be tailed and downstream tooling can parse incrementally. // // Row schema (per #3746): @@ -64,10 +64,9 @@ func Open(path string) (*Writer, error) { return &Writer{w: f, path: path}, nil } -// Path returns the file path the writer is appending to. func (w *Writer) Path() string { return w.path } -// Append serializes row as JSON and writes a single line. +// Append serializes row as a single line of JSON. Safe for concurrent use. func (w *Writer) Append(row Row) error { if row.TNs == 0 { row.TNs = time.Now().UnixNano() @@ -88,7 +87,6 @@ func (w *Writer) Append(row Row) error { return nil } -// Close flushes and closes the underlying file. func (w *Writer) Close() error { w.mu.Lock() defer w.mu.Unlock() diff --git a/tools/stress/device-orchestrator/pkg/runlog/runlog_test.go b/tools/stress/device-orchestrator/pkg/runlog/runlog_test.go index ca0cb31ddf..b7331b3c9f 100644 --- a/tools/stress/device-orchestrator/pkg/runlog/runlog_test.go +++ b/tools/stress/device-orchestrator/pkg/runlog/runlog_test.go @@ -15,7 +15,7 @@ import ( func TestWriter_RoundTrip(t *testing.T) { t.Parallel() - path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + path := filepath.Join(t.TempDir(), "orchestrator-runlog.jsonl") w, err := runlog.Open(path) require.NoError(t, err) @@ -50,7 +50,7 @@ func TestWriter_RoundTrip(t *testing.T) { func TestWriter_FillsMissingTimestamp(t *testing.T) { t.Parallel() - path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + path := filepath.Join(t.TempDir(), "orchestrator-runlog.jsonl") w, err := runlog.Open(path) require.NoError(t, err) defer w.Close() @@ -68,7 +68,7 @@ func TestWriter_FillsMissingTimestamp(t *testing.T) { func TestWriter_RejectsAfterClose(t *testing.T) { t.Parallel() - w, err := runlog.Open(filepath.Join(t.TempDir(), "orchestrator-runlog.json")) + w, err := runlog.Open(filepath.Join(t.TempDir(), "orchestrator-runlog.jsonl")) require.NoError(t, err) require.NoError(t, w.Close()) @@ -79,7 +79,7 @@ func TestWriter_RejectsAfterClose(t *testing.T) { func TestWriter_Truncates(t *testing.T) { t.Parallel() - path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + path := filepath.Join(t.TempDir(), "orchestrator-runlog.jsonl") require.NoError(t, os.WriteFile(path, []byte("stale\n"), 0o644)) w, err := runlog.Open(path) diff --git a/tools/stress/device-orchestrator/pkg/sweep/sweep.go b/tools/stress/device-orchestrator/pkg/sweep/sweep.go index cda03412c1..98e5b41943 100644 --- a/tools/stress/device-orchestrator/pkg/sweep/sweep.go +++ b/tools/stress/device-orchestrator/pkg/sweep/sweep.go @@ -89,11 +89,20 @@ func (c *Config) validate() error { return errors.New("sweep: Hold must be >= 0") case c.RunID == "": return errors.New("sweep: RunID is required") + case c.OwnerFilter.IsZero(): + return errors.New("sweep: OwnerFilter is required") case c.Executor == nil: return errors.New("sweep: Executor is required") case c.Runlog == nil: return errors.New("sweep: Runlog is required") } + return nil +} + +// applyDefaults fills the optional dependencies with production implementations. +// Kept separate from validate so validation stays free of mutation; Run calls +// it before validate (Agent's default depends on the resolved Logger). +func (c *Config) applyDefaults() { if c.Clock == nil { c.Clock = RealClock{} } @@ -103,7 +112,6 @@ func (c *Config) validate() error { if c.Agent == nil { c.Agent = agent.NewNoop(c.Logger) } - return nil } // createdUser tracks an orchestrator-owned user so the deprovision phase can @@ -118,9 +126,13 @@ type createdUser struct { // number of users actually created/deleted alongside the error (if any), so // callers can report partial progress on abort. func Run(ctx context.Context, cfg Config) error { + cfg.applyDefaults() if err := cfg.validate(); err != nil { return err } + // Tag every sweep log line with the run ID. + cfg.Logger = cfg.Logger.With("run_id", cfg.RunID) + if err := cfg.Agent.Start(ctx); err != nil { return fmt.Errorf("start agent runner: %w", err) } @@ -130,11 +142,12 @@ func Run(ctx context.Context, cfg Config) error { return err } // Always attempt deprovision so an abort during provision still cleans up - // what the sweep created. Use a fresh context for the deprovision phase if - // the original was cancelled, since the operator wants the tear-down to - // finish before exit. We respect the parent context's lifetime via the - // outer Run's error return — callers that want a hard stop pass a deadline. - depErr := deprovision(ctx, &cfg, created) + // what the sweep created. Teardown runs under a context derived with + // WithoutCancel: an abort/signal that cancelled ctx must not also abandon + // the users we already created, so deprovision ignores that cancellation + // (it still inherits ctx's values). Callers wanting a hard stop on teardown + // must enforce it out of band. + depErr := deprovision(context.WithoutCancel(ctx), &cfg, created) if err != nil { return err } @@ -182,7 +195,11 @@ func provision(ctx context.Context, cfg *Config) ([]createdUser, error) { return created, err } - res, err := cfg.Executor.CreateUser(ctx, idx) + // Don't let an abort interrupt an in-flight create. A cancelled + // CreateUser can return an error even after the transaction landed + // onchain, which would orphan a user the deprovision phase never + // learns about. Abort is observed at the iteration boundary above. + res, err := cfg.Executor.CreateUser(context.WithoutCancel(ctx), idx) if err != nil { return created, fmt.Errorf("create user idx=%d: %w", idx, err) } @@ -201,7 +218,10 @@ func provision(ctx context.Context, cfg *Config) ([]createdUser, error) { if runningTarget >= cfg.Target { break } - if cfg.Hold > 0 { + // Only hold when this batch actually created users; a no-op batch + // (target already satisfied by pre-existing state) shouldn't burn the + // hold interval. + if plan.ToCreate > 0 && cfg.Hold > 0 { select { case <-cfg.Clock.After(cfg.Hold): case <-ctx.Done(): @@ -213,13 +233,12 @@ func provision(ctx context.Context, cfg *Config) ([]createdUser, error) { } // deprovision walks the created slice in reverse, emitting deprovision_* -// events for each. +// events for each. It runs to completion regardless of ctx cancellation (Run +// passes an uncancellable teardown context) so an aborted sweep never leaks +// the users it created. func deprovision(ctx context.Context, cfg *Config, created []createdUser) error { activeCount := len(created) for i := len(created) - 1; i >= 0; i-- { - if err := ctx.Err(); err != nil { - return err - } u := created[i] pkStr := u.pubkey.String() submitAt := cfg.Clock.Now() diff --git a/tools/stress/device-orchestrator/pkg/sweep/sweep_test.go b/tools/stress/device-orchestrator/pkg/sweep/sweep_test.go index 3402d5fe8c..4fa16f1156 100644 --- a/tools/stress/device-orchestrator/pkg/sweep/sweep_test.go +++ b/tools/stress/device-orchestrator/pkg/sweep/sweep_test.go @@ -63,6 +63,10 @@ type fakeExecutor struct { // Optional hook to fail on the Nth create (1-based) — used by the abort test. failCreateOnCall int failErr error + + // Optional hook invoked after the Nth create completes (1-based), with the + // created count. The abort test uses it to cancel the sweep context. + afterCreate func(calls int) } func newFakeExecutor(owner solana.PublicKey) *fakeExecutor { @@ -96,6 +100,10 @@ func (f *fakeExecutor) CreateUser(ctx context.Context, idx int) (sweep.CreateRes }) f.mu.Unlock() + if f.afterCreate != nil { + f.afterCreate(calls) + } + now := time.Unix(1_700_000_000, int64(calls)*1_000_000) // micro-spaced timestamps return sweep.CreateResult{ UserPDA: pk, @@ -145,7 +153,7 @@ func TestRun_ProvisionsThenDeprovisionsInReverseOrder(t *testing.T) { owner := solana.NewWallet().PublicKey() exec := newFakeExecutor(owner) - path := filepath.Join(t.TempDir(), "orchestrator-runlog.json") + path := filepath.Join(t.TempDir(), "orchestrator-runlog.jsonl") w, err := runlog.Open(path) require.NoError(t, err) t.Cleanup(func() { _ = w.Close() }) @@ -205,7 +213,7 @@ func TestRun_HandlesZeroTarget(t *testing.T) { owner := solana.NewWallet().PublicKey() exec := newFakeExecutor(owner) - path := filepath.Join(t.TempDir(), "runlog.json") + path := filepath.Join(t.TempDir(), "runlog.jsonl") w, err := runlog.Open(path) require.NoError(t, err) t.Cleanup(func() { _ = w.Close() }) @@ -234,14 +242,22 @@ func TestRun_AbortBetweenUsersStillCleansUp(t *testing.T) { owner := solana.NewWallet().PublicKey() exec := newFakeExecutor(owner) - exec.failCreateOnCall = 3 - exec.failErr = context.Canceled - path := filepath.Join(t.TempDir(), "runlog.json") + path := filepath.Join(t.TempDir(), "runlog.jsonl") w, err := runlog.Open(path) require.NoError(t, err) t.Cleanup(func() { _ = w.Close() }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Cancel the sweep after the 2nd user is created, simulating an abort firing + // mid-sweep. The next provision iteration observes it at the boundary. + exec.afterCreate = func(calls int) { + if calls == 2 { + cancel() + } + } + cfg := sweep.Config{ RunID: "run-abort", Target: 4, @@ -252,14 +268,17 @@ func TestRun_AbortBetweenUsersStillCleansUp(t *testing.T) { Runlog: w, Clock: &fakeClock{now: time.Unix(1, 0)}, } - err = sweep.Run(context.Background(), cfg) + err = sweep.Run(ctx, cfg) require.Error(t, err, "abort during provision should surface error") + assert.True(t, errors.Is(err, context.Canceled)) - // Even on abort, deprovision should fire for the two users that were created. + // Even on abort, deprovision must run for the two users that were created — + // teardown uses an uncancellable context so the cancelled ctx doesn't leak state. require.NoError(t, w.Close()) rows := readRows(t, path) - // 2 provisions × 3 events = 6; plus a submit event for the failed third; plus 2 deprovision sets. + assert.Equal(t, int32(2), exec.createN.Load(), "abort stops further creates at the boundary") + assert.Equal(t, int32(2), exec.deleteN.Load(), "both created users are deprovisioned") deprovisionActivates := 0 for _, r := range rows { if r.Event == runlog.EventDeprovisionActivate { @@ -272,15 +291,17 @@ func TestRun_AbortBetweenUsersStillCleansUp(t *testing.T) { func TestRun_RejectsInvalidConfig(t *testing.T) { t.Parallel() + owner := solana.NewWallet().PublicKey() tests := []struct { name string cfg sweep.Config }{ - {name: "negative target", cfg: sweep.Config{Target: -1, UsersPerBatch: 1, RunID: "r", Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, - {name: "zero batch", cfg: sweep.Config{Target: 1, UsersPerBatch: 0, RunID: "r", Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, - {name: "missing run id", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, - {name: "missing executor", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, RunID: "r", Runlog: &runlog.Writer{}}}, - {name: "missing runlog", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, RunID: "r", Executor: &fakeExecutor{}}}, + {name: "negative target", cfg: sweep.Config{Target: -1, UsersPerBatch: 1, RunID: "r", OwnerFilter: owner, Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, + {name: "zero batch", cfg: sweep.Config{Target: 1, UsersPerBatch: 0, RunID: "r", OwnerFilter: owner, Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, + {name: "missing run id", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, OwnerFilter: owner, Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, + {name: "missing owner filter", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, RunID: "r", Executor: &fakeExecutor{}, Runlog: &runlog.Writer{}}}, + {name: "missing executor", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, RunID: "r", OwnerFilter: owner, Runlog: &runlog.Writer{}}}, + {name: "missing runlog", cfg: sweep.Config{Target: 1, UsersPerBatch: 1, RunID: "r", OwnerFilter: owner, Executor: &fakeExecutor{}}}, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -296,7 +317,7 @@ func TestRun_CancellationStopsBetweenUsers(t *testing.T) { owner := solana.NewWallet().PublicKey() exec := newFakeExecutor(owner) - path := filepath.Join(t.TempDir(), "runlog.json") + path := filepath.Join(t.TempDir(), "runlog.jsonl") w, err := runlog.Open(path) require.NoError(t, err) t.Cleanup(func() { _ = w.Close() })