Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/app/piped/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,13 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
input.Logger.Info("successfully cleaned gitClient for plan-preview")
}()

ppOpts := []planpreview.Option{
planpreview.WithLogger(input.Logger),
planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum),
planpreview.WithCommandQueueBufferSize(cfg.PlanPreview.CommandQueueBufferSize),
planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()),
planpreview.WithCommandHandleTimeout(cfg.PlanPreview.CommandHandleTimeout.Duration()),
}
h := planpreview.NewHandler(
gc,
apiClient,
Expand All @@ -535,7 +542,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
decrypter,
appManifestsCache,
cfg,
planpreview.WithLogger(input.Logger),
ppOpts...,
)
group.Go(func() error {
return h.Run(ctx)
Expand Down
16 changes: 12 additions & 4 deletions pkg/app/piped/planpreview/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,33 @@ type Option func(*options)

func WithWorkerNum(n int) Option {
return func(opts *options) {
opts.workerNum = n
if n > 0 {
opts.workerNum = n
}
}
}

func WithCommandQueueBufferSize(s int) Option {
return func(opts *options) {
opts.commandQueueBufferSize = s
if s > 0 {
opts.commandQueueBufferSize = s
}
}
}

func WithCommandCheckInterval(i time.Duration) Option {
return func(opts *options) {
opts.commandCheckInterval = i
if i > 0 {
opts.commandCheckInterval = i
}
}
}

func WithCommandHandleTimeout(t time.Duration) Option {
return func(opts *options) {
opts.commandHandleTimeout = t
if t > 0 {
opts.commandHandleTimeout = t
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,13 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
input.Logger.Info("successfully cleaned gitClient for plan-preview")
}()

ppOpts := []planpreview.Option{
planpreview.WithLogger(input.Logger),
planpreview.WithWorkerNum(cfg.PlanPreview.WorkerNum),
planpreview.WithCommandQueueBufferSize(cfg.PlanPreview.CommandQueueBufferSize),
planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()),
planpreview.WithCommandHandleTimeout(cfg.PlanPreview.CommandHandleTimeout.Duration()),
}
h := planpreview.NewHandler(
gc,
apiClient,
Expand All @@ -500,7 +507,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
decrypter,
cfg,
pluginRegistry,
planpreview.WithLogger(input.Logger),
ppOpts...,
)
group.Go(func() error {
return h.Run(ctx)
Expand Down
16 changes: 12 additions & 4 deletions pkg/app/pipedv1/planpreview/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,33 @@ type Option func(*options)

func WithWorkerNum(n int) Option {
return func(opts *options) {
opts.workerNum = n
if n > 0 {
opts.workerNum = n
}
}
}

func WithCommandQueueBufferSize(s int) Option {
return func(opts *options) {
opts.commandQueueBufferSize = s
if s > 0 {
opts.commandQueueBufferSize = s
}
}
}

func WithCommandCheckInterval(i time.Duration) Option {
return func(opts *options) {
opts.commandCheckInterval = i
if i > 0 {
opts.commandCheckInterval = i
}
}
}

func WithCommandHandleTimeout(t time.Duration) Option {
return func(opts *options) {
opts.commandHandleTimeout = t
if t > 0 {
opts.commandHandleTimeout = t
}
}
}

Expand Down
32 changes: 32 additions & 0 deletions pkg/config/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type PipedSpec struct {
SecretManagement *SecretManagement `json:"secretManagement,omitempty"`
// Optional settings for event watcher.
EventWatcher PipedEventWatcher `json:"eventWatcher"`
// Optional settings for plan-preview command handling.
PlanPreview PipedPlanPreview `json:"planPreview"`
// List of labels to filter all applications this piped will handle.
AppSelector map[string]string `json:"appSelector,omitempty"`
}
Expand Down Expand Up @@ -141,6 +143,9 @@ func (s *PipedSpec) Validate() error {
if err := s.EventWatcher.Validate(); err != nil {
return err
}
if err := s.PlanPreview.Validate(); err != nil {
return err
}
for _, n := range s.Notifications.Receivers {
if n.Slack != nil {
if err := n.Slack.Validate(); err != nil {
Expand Down Expand Up @@ -1211,3 +1216,30 @@ type PipedEventWatcherGitRepo struct {
// This is prioritized if both includes and this one are given.
Excludes []string `json:"excludes,omitempty"`
}

type PipedPlanPreview struct {
// WorkerNum is the number of worker goroutines processing plan-preview commands.
WorkerNum int `json:"workerNum,omitempty"`
// CommandQueueBufferSize is the buffer size of the internal command channel.
CommandQueueBufferSize int `json:"commandQueueBufferSize,omitempty"`
// CommandCheckInterval is how often to poll for new plan-preview commands.
CommandCheckInterval Duration `json:"commandCheckInterval,omitempty"`
// CommandHandleTimeout is the default timeout for building each plan-preview result when the command does not specify one.
CommandHandleTimeout Duration `json:"commandHandleTimeout,omitempty"`
}

func (p *PipedPlanPreview) Validate() error {
if p.WorkerNum < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You allow WorkerNum = 0, I scare that commands can be block by this allowance, please verify the behavior to find the right condition for validating

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I checked the handler behavior here in piped/planpreview/handler.go:

for i := 0; i < h.options.workerNum; i++ {
		go startWorker(ctx, h.commandCh)
	}

From the handler logic, workerNum = 0 would result in no workers being started, so commands wouldn’t be processed if it reached there as is.

In the current flow, we guard this at the callsite (> 0 check in piped.go), so a zero value isn’t passed to WithWorkerNum and the handler falls back to its default worker count.

I’m happy to enforce > 0 in Validate() as well if you think that’s preferable, though it may also affect cases where the field is omitted and defaults are expected.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the right thing to do here, but isn't that we are using defensive programming in a lot of place, I don't think this is a good idea

maybe just concentrate on validating config at one place, what do you think ?

Copy link
Copy Markdown
Contributor Author

@rawadhossain rawadhossain Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think that would be cleaner. I’ve moved the > 0 checks into the With* option functions in the handler, so the “0 = keep default” logic is handled in one place now.

Validate() is just handling invalid cases (like negatives). Since with plain int/Duration, omitted and 0 both decode the same after YAML parsing, we can’t really reject 0 there without breaking configs where users just omit the planPreview block.

Let me know your thoughts on this.

return errors.New("planPreview.workerNum must be greater than or equal to 0")
}
if p.CommandQueueBufferSize < 0 {
return errors.New("planPreview.commandQueueBufferSize must be greater than or equal to 0")
}
if p.CommandCheckInterval < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommandCheckInterval will later be used for creating time.Ticker

commandTicker := time.NewTicker(h.options.commandCheckInterval)

This will cause panic if h.options.commandCheckInterval = 0

Copy link
Copy Markdown
Contributor Author

@rawadhossain rawadhossain Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, time.NewTicker will panic for non positive durations, so this definitely needs to be guarded.

We handle this the same way as above in piped.go by passing the option when the value is > 0, so zero isn’t forwarded and the handler uses its default interval.

if cfg.PlanPreview.CommandCheckInterval > 0 {
    ppOpts = append(ppOpts, planpreview.WithCommandCheckInterval(cfg.PlanPreview.CommandCheckInterval.Duration()))
}

return errors.New("planPreview.commandCheckInterval must be greater than or equal to 0")
}
if p.CommandHandleTimeout < 0 {
return errors.New("planPreview.commandHandleTimeout must be greater than or equal to 0")
}
return nil
}
107 changes: 107 additions & 0 deletions pkg/config/piped_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ func TestPipedConfig(t *testing.T) {
},
},
},
PlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
expectedError: nil,
},
Expand All @@ -378,6 +384,95 @@ func TestPipedConfig(t *testing.T) {
}
}

func TestPipedPlanPreviewValidate(t *testing.T) {
testcases := []struct {
name string
planPreview PipedPlanPreview
wantErr bool
wantPipedPlanPreview PipedPlanPreview
}{
{
name: "negative workerNum",
wantErr: true,
planPreview: PipedPlanPreview{
WorkerNum: -1,
},
wantPipedPlanPreview: PipedPlanPreview{
WorkerNum: -1,
},
},
{
name: "negative commandQueueBufferSize",
wantErr: true,
planPreview: PipedPlanPreview{
CommandQueueBufferSize: -1,
},
wantPipedPlanPreview: PipedPlanPreview{
CommandQueueBufferSize: -1,
},
},
{
name: "negative commandCheckInterval",
wantErr: true,
planPreview: PipedPlanPreview{
CommandCheckInterval: Duration(-time.Second),
},
wantPipedPlanPreview: PipedPlanPreview{
CommandCheckInterval: Duration(-time.Second),
},
},
{
name: "negative commandHandleTimeout",
wantErr: true,
planPreview: PipedPlanPreview{
CommandHandleTimeout: Duration(-time.Minute),
},
wantPipedPlanPreview: PipedPlanPreview{
CommandHandleTimeout: Duration(-time.Minute),
},
},
{
name: "all zero",
wantErr: false,
planPreview: PipedPlanPreview{
WorkerNum: 0,
CommandQueueBufferSize: 0,
CommandCheckInterval: Duration(0),
CommandHandleTimeout: Duration(0),
},
wantPipedPlanPreview: PipedPlanPreview{
WorkerNum: 0,
CommandQueueBufferSize: 0,
CommandCheckInterval: Duration(0),
CommandHandleTimeout: Duration(0),
},
},
{
name: "valid values",
wantErr: false,
planPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
wantPipedPlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
err := tc.planPreview.Validate()
assert.Equal(t, tc.wantErr, err != nil)
assert.Equal(t, tc.wantPipedPlanPreview, tc.planPreview)
})
}
}

func TestPipedEventWatcherValidate(t *testing.T) {
testcases := []struct {
name string
Expand Down Expand Up @@ -1137,6 +1232,12 @@ func TestPipedSpecClone(t *testing.T) {
},
},
},
PlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
expectedSpec: &PipedSpec{
ProjectID: "test-project",
Expand Down Expand Up @@ -1335,6 +1436,12 @@ func TestPipedSpecClone(t *testing.T) {
},
},
},
PlanPreview: PipedPlanPreview{
WorkerNum: 5,
CommandQueueBufferSize: 20,
CommandCheckInterval: Duration(5 * time.Second),
CommandHandleTimeout: Duration(10 * time.Minute),
},
},
expectedError: nil,
},
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/testdata/piped/piped-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,9 @@ spec:
includes:
- event-watcher-dev.yaml
- event-watcher-stg.yaml

planPreview:
workerNum: 5
commandQueueBufferSize: 20
commandCheckInterval: 5s
commandHandleTimeout: 10m
32 changes: 32 additions & 0 deletions pkg/configv1/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type PipedSpec struct {
SecretManagement *SecretManagement `json:"secretManagement,omitempty"`
// Optional settings for event watcher.
EventWatcher PipedEventWatcher `json:"eventWatcher"`
// Optional settings for plan-preview command handling.
PlanPreview PipedPlanPreview `json:"planPreview"`
// List of labels to filter all applications this piped will handle.
AppSelector map[string]string `json:"appSelector,omitempty"`
}
Expand Down Expand Up @@ -113,6 +115,9 @@ func (s *PipedSpec) Validate() error {
if err := s.EventWatcher.Validate(); err != nil {
return err
}
if err := s.PlanPreview.Validate(); err != nil {
return err
}
for _, n := range s.Notifications.Receivers {
if n.Slack != nil {
if err := n.Slack.Validate(); err != nil {
Expand Down Expand Up @@ -632,6 +637,33 @@ type PipedEventWatcherGitRepo struct {
Excludes []string `json:"excludes,omitempty"`
}

type PipedPlanPreview struct {
// WorkerNum is the number of worker goroutines processing plan-preview commands.
WorkerNum int `json:"workerNum,omitempty"`
// CommandQueueBufferSize is the buffer size of the internal command channel.
CommandQueueBufferSize int `json:"commandQueueBufferSize,omitempty"`
// CommandCheckInterval is how often to poll for new plan-preview commands.
CommandCheckInterval Duration `json:"commandCheckInterval,omitempty"`
// CommandHandleTimeout is the default timeout for building each plan-preview result when the command does not specify one.
CommandHandleTimeout Duration `json:"commandHandleTimeout,omitempty"`
}

func (p *PipedPlanPreview) Validate() error {
if p.WorkerNum < 0 {
return errors.New("planPreview.workerNum must be greater than or equal to 0")
}
if p.CommandQueueBufferSize < 0 {
return errors.New("planPreview.commandQueueBufferSize must be greater than or equal to 0")
}
if p.CommandCheckInterval < 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same reason with v0, commandCheckInterval = 0 would cause panic

return errors.New("planPreview.commandCheckInterval must be greater than or equal to 0")
}
if p.CommandHandleTimeout < 0 {
return errors.New("planPreview.commandHandleTimeout must be greater than or equal to 0")
}
return nil
}

// PipedPlugin defines the plugin configuration for the piped.
type PipedPlugin struct {
// The name of the plugin.
Expand Down
Loading
Loading