From ec29a1779c0ecf3e84401c0a8c1cd994e373186c Mon Sep 17 00:00:00 2001 From: Lucas Machado Date: Tue, 16 Jun 2026 21:42:06 +0200 Subject: [PATCH] Add gRPC and WebSocket readiness checks --- README.md | 2 + actions/assert.go | 46 ++++++++++++ actions/wait.go | 55 ++++++++++++++ docs/COMMANDS.md | 14 ++++ go.mod | 6 ++ go.sum | 32 +++++++++ services/probe_service.go | 64 +++++++++++++++++ services/probe_service_test.go | 128 +++++++++++++++++++++++++++++++++ services/wait_service.go | 72 +++++++++++++++++++ 9 files changed, 419 insertions(+) create mode 100644 services/probe_service.go create mode 100644 services/probe_service_test.go diff --git a/README.md b/README.md index 0abfc08..67eb61d 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ pipekit assert env-exists DEPLOY_TOKEN CLUSTER_NAME IMAGE_TAG # Wait for a service to be ready pipekit wait url http://localhost:8080/healthz --timeout 150s +pipekit wait grpc localhost:50051 --service my.package.Worker --timeout 60s +pipekit wait ws ws://localhost:8080/events --timeout 60s # Retry a flaky command with exponential backoff pipekit retry run --attempts 5 --delay 5s --backoff -- helm upgrade --install myapp ./chart diff --git a/actions/assert.go b/actions/assert.go index 0fd1390..c7ac61e 100644 --- a/actions/assert.go +++ b/actions/assert.go @@ -131,6 +131,52 @@ func AssertCommand() cli.Command { return nil }, }, + { + Name: "grpc", + Usage: "assert a gRPC health endpoint reports SERVING", + ArgsUsage: "HOST:PORT", + Flags: []cli.Flag{ + cli.StringFlag{Name: "service", Usage: "gRPC health service name"}, + cli.StringFlag{Name: "timeout", Value: "10s", Usage: "request timeout"}, + cli.BoolFlag{Name: "tls", Usage: "use TLS for the gRPC connection"}, + }, + Action: func(c *cli.Context) error { + address := c.Args().First() + if address == "" { + return cli.NewExitError("address (host:port) required", 1) + } + timeout, err := time.ParseDuration(c.String("timeout")) + if err != nil { + return cli.NewExitError("invalid timeout: "+err.Error(), 1) + } + if err := services.AssertGRPCHealth(address, c.String("service"), timeout, c.Bool("tls")); err != nil { + return cli.NewExitError(err.Error(), 1) + } + return nil + }, + }, + { + Name: "ws", + Usage: "assert a WebSocket endpoint accepts an upgrade", + ArgsUsage: "WS_URL", + Flags: []cli.Flag{ + cli.StringFlag{Name: "timeout", Value: "10s", Usage: "request timeout"}, + }, + Action: func(c *cli.Context) error { + urlStr := c.Args().First() + if urlStr == "" { + return cli.NewExitError("WebSocket URL required", 1) + } + timeout, err := time.ParseDuration(c.String("timeout")) + if err != nil { + return cli.NewExitError("invalid timeout: "+err.Error(), 1) + } + if err := services.AssertWebSocket(urlStr, timeout); err != nil { + return cli.NewExitError(err.Error(), 1) + } + return nil + }, + }, { Name: "path", Usage: "assert one or more paths exist (file or directory)", diff --git a/actions/wait.go b/actions/wait.go index be5b1e4..edec29a 100644 --- a/actions/wait.go +++ b/actions/wait.go @@ -87,6 +87,61 @@ func WaitCommand() cli.Command { return nil }, }, + { + Name: "grpc", + Usage: "poll a gRPC health endpoint until it reports SERVING", + ArgsUsage: "HOST:PORT", + Flags: append(commonFlags, + cli.StringFlag{Name: "service", Usage: "gRPC health service name"}, + cli.BoolFlag{Name: "tls", Usage: "use TLS for the gRPC connection"}, + ), + Action: func(c *cli.Context) error { + address := c.Args().First() + if address == "" { + return cli.NewExitError("address (host:port) required", 1) + } + timeout, err := time.ParseDuration(c.String("timeout")) + if err != nil { + return cli.NewExitError("invalid timeout: "+err.Error(), 1) + } + interval, err := time.ParseDuration(c.String("interval")) + if err != nil { + return cli.NewExitError("invalid interval: "+err.Error(), 1) + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err := services.WaitForGRPCHealth(ctx, address, c.String("service"), interval, c.Bool("backoff"), c.Bool("quiet"), c.Bool("tls")); err != nil { + return cli.NewExitError(err.Error(), 1) + } + return nil + }, + }, + { + Name: "ws", + Usage: "poll a WebSocket endpoint until it accepts an upgrade", + ArgsUsage: "WS_URL", + Flags: commonFlags, + Action: func(c *cli.Context) error { + urlStr := c.Args().First() + if urlStr == "" { + return cli.NewExitError("WebSocket URL required", 1) + } + timeout, err := time.ParseDuration(c.String("timeout")) + if err != nil { + return cli.NewExitError("invalid timeout: "+err.Error(), 1) + } + interval, err := time.ParseDuration(c.String("interval")) + if err != nil { + return cli.NewExitError("invalid interval: "+err.Error(), 1) + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if err := services.WaitForWebSocket(ctx, urlStr, interval, c.Bool("backoff"), c.Bool("quiet")); err != nil { + return cli.NewExitError(err.Error(), 1) + } + return nil + }, + }, { Name: "command", Usage: "retry a shell command until it exits 0", diff --git a/docs/COMMANDS.md b/docs/COMMANDS.md index f483244..3053f52 100644 --- a/docs/COMMANDS.md +++ b/docs/COMMANDS.md @@ -308,6 +308,12 @@ pipekit assert compare 2.0.0 gt 1.5.0 # URL returns one of the expected statuses pipekit assert url https://api.example.com/health --expected-status 200,204 +# gRPC health endpoint reports SERVING +pipekit assert grpc localhost:50051 --service my.package.Worker + +# WebSocket endpoint accepts an upgrade +pipekit assert ws ws://localhost:8080/events + # Path exists (file or directory) pipekit assert path /etc/myapp/config.yaml /var/lib/myapp @@ -404,6 +410,12 @@ pipekit wait url http://localhost:8080/healthz --expected-body "healthy" # TCP port pipekit wait tcp localhost:5432 --timeout 60s +# gRPC health endpoint +pipekit wait grpc localhost:50051 --service my.package.Worker --timeout 60s + +# WebSocket endpoint +pipekit wait ws ws://localhost:8080/events --timeout 60s + # Arbitrary command (exit 0 = ready) pipekit wait command "pg_isready -h localhost" --timeout 30s --backoff @@ -419,6 +431,8 @@ pipekit wait url http://localhost:8080/healthz --quiet | `--quiet` | Suppress per-attempt output | `false` | | `--expected-status` | Acceptable HTTP codes (csv) | `200` | | `--expected-body` | Substring to look for in response body | — | +| `--service` | gRPC health service name | — | +| `--tls` | Use TLS for gRPC health checks | `false` | diff --git a/go.mod b/go.mod index ab787e9..022bf2b 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,12 @@ require ( github.com/Masterminds/semver/v3 v3.4.0 github.com/bmatcuk/doublestar/v4 v4.10.0 github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 github.com/itchyny/gojq v0.12.18 github.com/pelletier/go-toml/v2 v2.3.1 github.com/rs/zerolog v1.34.0 github.com/urfave/cli v1.22.17 + google.golang.org/grpc v1.72.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -21,5 +23,9 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + golang.org/x/net v0.39.0 // indirect golang.org/x/sys v0.38.0 // indirect + golang.org/x/text v0.24.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect + google.golang.org/protobuf v1.36.6 // indirect ) diff --git a/go.sum b/go.sum index 0201dc6..6709e98 100644 --- a/go.sum +++ b/go.sum @@ -9,9 +9,19 @@ github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/itchyny/gojq v0.12.18 h1:gFGHyt/MLbG9n6dqnvlliiya2TaMMh6FFaR2b1H6Drc= github.com/itchyny/gojq v0.12.18/go.mod h1:4hPoZ/3lN9fDL1D+aK7DY1f39XZpY9+1Xpjz8atrEkg= github.com/itchyny/timefmt-go v0.1.7 h1:xyftit9Tbw+Dc/huSSPJaEmX1TVL8lw5vxjJLK4GMMA= @@ -43,11 +53,33 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/urfave/cli v1.22.17 h1:SYzXoiPfQjHBbkYxbew5prZHS1TOLT3ierW8SYLqtVQ= github.com/urfave/cli v1.22.17/go.mod h1:b0ht0aqgH/6pBYzzxURyrM4xXNgsoT/n2ZzwQiEhNVo= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8= +google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/services/probe_service.go b/services/probe_service.go new file mode 100644 index 0000000..f9fc38f --- /dev/null +++ b/services/probe_service.go @@ -0,0 +1,64 @@ +package services + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/gorilla/websocket" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// AssertGRPCHealth checks the standard gRPC health service once. +func AssertGRPCHealth(address string, serviceName string, timeout time.Duration, useTLS bool) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + opts := []grpc.DialOption{grpc.WithBlock()} + if useTLS { + opts = append(opts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, ""))) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + conn, err := grpc.DialContext(ctx, address, opts...) + if err != nil { + return fmt.Errorf("connecting to gRPC %s: %w", address, err) + } + defer conn.Close() + + resp, err := healthpb.NewHealthClient(conn).Check(ctx, &healthpb.HealthCheckRequest{Service: serviceName}) + if err != nil { + return fmt.Errorf("checking gRPC health %s: %w", address, err) + } + if resp.GetStatus() != healthpb.HealthCheckResponse_SERVING { + return fmt.Errorf("gRPC %s health status %s, expected SERVING", address, resp.GetStatus()) + } + return nil +} + +// AssertWebSocket checks that a WebSocket endpoint accepts an upgrade once. +func AssertWebSocket(urlStr string, timeout time.Duration) error { + dialer := websocket.Dialer{HandshakeTimeout: timeout} + conn, resp, err := dialer.Dial(urlStr, nil) + if err != nil { + if resp != nil { + return fmt.Errorf("connecting to WebSocket %s: status %d: %w", urlStr, resp.StatusCode, err) + } + return fmt.Errorf("connecting to WebSocket %s: %w", urlStr, err) + } + defer conn.Close() + + if resp == nil || resp.StatusCode != http.StatusSwitchingProtocols { + status := 0 + if resp != nil { + status = resp.StatusCode + } + return fmt.Errorf("WebSocket %s returned status %d, expected 101", urlStr, status) + } + return nil +} diff --git a/services/probe_service_test.go b/services/probe_service_test.go new file mode 100644 index 0000000..ecd67de --- /dev/null +++ b/services/probe_service_test.go @@ -0,0 +1,128 @@ +package services + +import ( + "context" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestAssertGRPCHealthServing(t *testing.T) { + address, stop := startHealthServer(t, "", healthpb.HealthCheckResponse_SERVING) + defer stop() + + if err := AssertGRPCHealth(address, "", 2*time.Second, false); err != nil { + t.Fatalf("expected serving health check, got: %v", err) + } +} + +func TestAssertGRPCHealthNotServing(t *testing.T) { + address, stop := startHealthServer(t, "", healthpb.HealthCheckResponse_NOT_SERVING) + defer stop() + + err := AssertGRPCHealth(address, "", 2*time.Second, false) + if err == nil { + t.Fatal("expected not-serving error") + } + if !strings.Contains(err.Error(), "NOT_SERVING") { + t.Fatalf("expected status in error, got: %v", err) + } +} + +func TestWaitForGRPCHealth(t *testing.T) { + address, stop := startHealthServer(t, "worker", healthpb.HealthCheckResponse_SERVING) + defer stop() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + if err := WaitForGRPCHealth(ctx, address, "worker", 50*time.Millisecond, false, true, false); err != nil { + t.Fatalf("expected ready, got: %v", err) + } +} + +func TestAssertWebSocket(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{} + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Errorf("upgrade: %v", err) + return + } + conn.Close() + })) + defer srv.Close() + + if err := AssertWebSocket(toWebSocketURL(srv.URL), 2*time.Second); err != nil { + t.Fatalf("expected WebSocket upgrade, got: %v", err) + } +} + +func TestAssertWebSocketRejectsHTTP(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + err := AssertWebSocket(toWebSocketURL(srv.URL), 2*time.Second) + if err == nil { + t.Fatal("expected WebSocket error") + } +} + +func TestWaitForWebSocket(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{} + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Errorf("upgrade: %v", err) + return + } + conn.Close() + })) + defer srv.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + if err := WaitForWebSocket(ctx, toWebSocketURL(srv.URL), 50*time.Millisecond, false, true); err != nil { + t.Fatalf("expected ready, got: %v", err) + } +} + +func startHealthServer(t *testing.T, serviceName string, status healthpb.HealthCheckResponse_ServingStatus) (string, func()) { + t.Helper() + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen: %v", err) + } + + server := grpc.NewServer() + healthServer := health.NewServer() + healthServer.SetServingStatus(serviceName, status) + healthpb.RegisterHealthServer(server, healthServer) + + go func() { + if err := server.Serve(ln); err != nil { + t.Logf("grpc serve: %v", err) + } + }() + + return ln.Addr().String(), func() { + server.Stop() + ln.Close() + } +} + +func toWebSocketURL(urlStr string) string { + return "ws" + strings.TrimPrefix(urlStr, "http") +} diff --git a/services/wait_service.go b/services/wait_service.go index 7300d24..5bc8641 100644 --- a/services/wait_service.go +++ b/services/wait_service.go @@ -118,6 +118,78 @@ func WaitForTCP(ctx context.Context, address string, interval time.Duration, bac } } +// WaitForGRPCHealth polls the standard gRPC health service until it reports SERVING. +func WaitForGRPCHealth(ctx context.Context, address string, serviceName string, interval time.Duration, backoff bool, quiet bool, useTLS bool) error { + attempt := 0 + delay := interval + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for gRPC %s", address) + default: + } + + attempt++ + if err := AssertGRPCHealth(address, serviceName, 5*time.Second, useTLS); err == nil { + if !quiet { + helpers.Log.Info().Msgf("gRPC %s is ready (attempt %d)", address, attempt) + } + return nil + } + + if !quiet { + helpers.Log.Info().Msgf("Waiting for gRPC %s (attempt %d)...", address, attempt) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for gRPC %s after %d attempts", address, attempt) + case <-time.After(delay): + } + + if backoff { + delay = delay * 2 + } + } +} + +// WaitForWebSocket polls a WebSocket endpoint until it accepts an upgrade. +func WaitForWebSocket(ctx context.Context, urlStr string, interval time.Duration, backoff bool, quiet bool) error { + attempt := 0 + delay := interval + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for WebSocket %s", urlStr) + default: + } + + attempt++ + if err := AssertWebSocket(urlStr, 5*time.Second); err == nil { + if !quiet { + helpers.Log.Info().Msgf("WebSocket %s is ready (attempt %d)", urlStr, attempt) + } + return nil + } + + if !quiet { + helpers.Log.Info().Msgf("Waiting for WebSocket %s (attempt %d)...", urlStr, attempt) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for WebSocket %s after %d attempts", urlStr, attempt) + case <-time.After(delay): + } + + if backoff { + delay = delay * 2 + } + } +} + // WaitForCommand retries a shell command until it exits 0. func WaitForCommand(ctx context.Context, command string, interval time.Duration, backoff bool, quiet bool) error { attempt := 0