Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pipekit assert env-exists DEPLOY_TOKEN CLUSTER_NAME IMAGE_TAG

# Wait for a service to be ready
pipekit wait url http://localhost:8080/healthz --timeout 150s
pipekit wait grpc localhost:50051 --service my.package.Worker --timeout 60s
pipekit wait ws ws://localhost:8080/events --timeout 60s

# Retry a flaky command with exponential backoff
pipekit retry run --attempts 5 --delay 5s --backoff -- helm upgrade --install myapp ./chart
Expand Down
46 changes: 46 additions & 0 deletions actions/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,52 @@ func AssertCommand() cli.Command {
return nil
},
},
{
Name: "grpc",
Usage: "assert a gRPC health endpoint reports SERVING",
ArgsUsage: "HOST:PORT",
Flags: []cli.Flag{
cli.StringFlag{Name: "service", Usage: "gRPC health service name"},
cli.StringFlag{Name: "timeout", Value: "10s", Usage: "request timeout"},
cli.BoolFlag{Name: "tls", Usage: "use TLS for the gRPC connection"},
},
Action: func(c *cli.Context) error {
address := c.Args().First()
if address == "" {
return cli.NewExitError("address (host:port) required", 1)
}
timeout, err := time.ParseDuration(c.String("timeout"))
if err != nil {
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
}
if err := services.AssertGRPCHealth(address, c.String("service"), timeout, c.Bool("tls")); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "ws",
Usage: "assert a WebSocket endpoint accepts an upgrade",
ArgsUsage: "WS_URL",
Flags: []cli.Flag{
cli.StringFlag{Name: "timeout", Value: "10s", Usage: "request timeout"},
},
Action: func(c *cli.Context) error {
urlStr := c.Args().First()
if urlStr == "" {
return cli.NewExitError("WebSocket URL required", 1)
}
timeout, err := time.ParseDuration(c.String("timeout"))
if err != nil {
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
}
if err := services.AssertWebSocket(urlStr, timeout); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "path",
Usage: "assert one or more paths exist (file or directory)",
Expand Down
55 changes: 55 additions & 0 deletions actions/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,61 @@ func WaitCommand() cli.Command {
return nil
},
},
{
Name: "grpc",
Usage: "poll a gRPC health endpoint until it reports SERVING",
ArgsUsage: "HOST:PORT",
Flags: append(commonFlags,
cli.StringFlag{Name: "service", Usage: "gRPC health service name"},
cli.BoolFlag{Name: "tls", Usage: "use TLS for the gRPC connection"},
),
Action: func(c *cli.Context) error {
address := c.Args().First()
if address == "" {
return cli.NewExitError("address (host:port) required", 1)
}
timeout, err := time.ParseDuration(c.String("timeout"))
if err != nil {
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
}
interval, err := time.ParseDuration(c.String("interval"))
if err != nil {
return cli.NewExitError("invalid interval: "+err.Error(), 1)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := services.WaitForGRPCHealth(ctx, address, c.String("service"), interval, c.Bool("backoff"), c.Bool("quiet"), c.Bool("tls")); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "ws",
Usage: "poll a WebSocket endpoint until it accepts an upgrade",
ArgsUsage: "WS_URL",
Flags: commonFlags,
Action: func(c *cli.Context) error {
urlStr := c.Args().First()
if urlStr == "" {
return cli.NewExitError("WebSocket URL required", 1)
}
timeout, err := time.ParseDuration(c.String("timeout"))
if err != nil {
return cli.NewExitError("invalid timeout: "+err.Error(), 1)
}
interval, err := time.ParseDuration(c.String("interval"))
if err != nil {
return cli.NewExitError("invalid interval: "+err.Error(), 1)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := services.WaitForWebSocket(ctx, urlStr, interval, c.Bool("backoff"), c.Bool("quiet")); err != nil {
return cli.NewExitError(err.Error(), 1)
}
return nil
},
},
{
Name: "command",
Usage: "retry a shell command until it exits 0",
Expand Down
14 changes: 14 additions & 0 deletions docs/COMMANDS.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ pipekit assert compare 2.0.0 gt 1.5.0
# URL returns one of the expected statuses
pipekit assert url https://api.example.com/health --expected-status 200,204

# gRPC health endpoint reports SERVING
pipekit assert grpc localhost:50051 --service my.package.Worker

# WebSocket endpoint accepts an upgrade
pipekit assert ws ws://localhost:8080/events

# Path exists (file or directory)
pipekit assert path /etc/myapp/config.yaml /var/lib/myapp

Expand Down Expand Up @@ -404,6 +410,12 @@ pipekit wait url http://localhost:8080/healthz --expected-body "healthy"
# TCP port
pipekit wait tcp localhost:5432 --timeout 60s

# gRPC health endpoint
pipekit wait grpc localhost:50051 --service my.package.Worker --timeout 60s

# WebSocket endpoint
pipekit wait ws ws://localhost:8080/events --timeout 60s

# Arbitrary command (exit 0 = ready)
pipekit wait command "pg_isready -h localhost" --timeout 30s --backoff

Expand All @@ -419,6 +431,8 @@ pipekit wait url http://localhost:8080/healthz --quiet
| `--quiet` | Suppress per-attempt output | `false` |
| `--expected-status` | Acceptable HTTP codes (csv) | `200` |
| `--expected-body` | Substring to look for in response body | — |
| `--service` | gRPC health service name | — |
| `--tls` | Use TLS for gRPC health checks | `false` |

</details>

Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ require (
github.com/Masterminds/semver/v3 v3.4.0
github.com/bmatcuk/doublestar/v4 v4.10.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/itchyny/gojq v0.12.18
github.com/pelletier/go-toml/v2 v2.3.1
github.com/rs/zerolog v1.34.0
github.com/urfave/cli v1.22.17
google.golang.org/grpc v1.72.2
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -21,5 +23,9 @@ require (
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 // indirect
google.golang.org/protobuf v1.36.6 // indirect
)
32 changes: 32 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@ github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6N
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/itchyny/gojq v0.12.18 h1:gFGHyt/MLbG9n6dqnvlliiya2TaMMh6FFaR2b1H6Drc=
github.com/itchyny/gojq v0.12.18/go.mod h1:4hPoZ/3lN9fDL1D+aK7DY1f39XZpY9+1Xpjz8atrEkg=
github.com/itchyny/timefmt-go v0.1.7 h1:xyftit9Tbw+Dc/huSSPJaEmX1TVL8lw5vxjJLK4GMMA=
Expand Down Expand Up @@ -43,11 +53,33 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/urfave/cli v1.22.17 h1:SYzXoiPfQjHBbkYxbew5prZHS1TOLT3ierW8SYLqtVQ=
github.com/urfave/cli v1.22.17/go.mod h1:b0ht0aqgH/6pBYzzxURyrM4xXNgsoT/n2ZzwQiEhNVo=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34 h1:h6p3mQqrmT1XkHVTfzLdNz1u7IhINeZkz67/xTbOuWs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250428153025-10db94c68c34/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8=
google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
Expand Down
64 changes: 64 additions & 0 deletions services/probe_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package services

import (
"context"
"fmt"
"net/http"
"time"

"github.com/gorilla/websocket"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// AssertGRPCHealth checks the standard gRPC health service once.
func AssertGRPCHealth(address string, serviceName string, timeout time.Duration, useTLS bool) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

opts := []grpc.DialOption{grpc.WithBlock()}
if useTLS {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn, err := grpc.DialContext(ctx, address, opts...)
if err != nil {
return fmt.Errorf("connecting to gRPC %s: %w", address, err)
}
defer conn.Close()

resp, err := healthpb.NewHealthClient(conn).Check(ctx, &healthpb.HealthCheckRequest{Service: serviceName})
if err != nil {
return fmt.Errorf("checking gRPC health %s: %w", address, err)
}
if resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
return fmt.Errorf("gRPC %s health status %s, expected SERVING", address, resp.GetStatus())
}
return nil
}

// AssertWebSocket checks that a WebSocket endpoint accepts an upgrade once.
func AssertWebSocket(urlStr string, timeout time.Duration) error {
dialer := websocket.Dialer{HandshakeTimeout: timeout}
conn, resp, err := dialer.Dial(urlStr, nil)
if err != nil {
if resp != nil {
return fmt.Errorf("connecting to WebSocket %s: status %d: %w", urlStr, resp.StatusCode, err)
}
return fmt.Errorf("connecting to WebSocket %s: %w", urlStr, err)
}
defer conn.Close()

if resp == nil || resp.StatusCode != http.StatusSwitchingProtocols {
status := 0
if resp != nil {
status = resp.StatusCode
}
return fmt.Errorf("WebSocket %s returned status %d, expected 101", urlStr, status)
}
return nil
}
Loading
Loading